YAKL
YAKL_streams_events.h
Go to the documentation of this file.
1 
2 #pragma once
3 
5 namespace yakl {
6 
7  #ifdef YAKL_ENABLE_STREAMS
8  bool constexpr streams_enabled = true;
9  #else
10 
11  bool constexpr streams_enabled = false;
12  #endif
13 
14  #if defined(YAKL_ARCH_CUDA)
15 
16  class Stream;
17  class Event;
18 
19  class Stream {
20  protected:
21  cudaStream_t my_stream;
22  int * refCount; // Pointer shared by multiple copies of this Array to keep track of allcation / free
23 
24  void nullify() { my_stream = 0; refCount = nullptr; }
25 
26  public:
27 
28  Stream() { nullify(); }
29  Stream(cudaStream_t cuda_stream) { nullify(); my_stream = cuda_stream; }
30  ~Stream() { destroy(); }
31 
32  Stream(Stream const &rhs) {
33  my_stream = rhs.my_stream;
34  refCount = rhs.refCount;
35  if (refCount != nullptr) (*refCount)++;
36  }
37  Stream(Stream &&rhs) {
38  my_stream = rhs.my_stream;
39  refCount = rhs.refCount;
40  rhs.nullify();
41  }
42  Stream & operator=(Stream const &rhs) {
43  if (this != &rhs) {
44  destroy();
45  my_stream = rhs.my_stream;
46  refCount = rhs.refCount;
47  if (refCount != nullptr) (*refCount)++;
48  }
49  return *this;
50  }
51  Stream & operator=(Stream &&rhs) {
52  if (this != &rhs) {
53  destroy();
54  my_stream = rhs.my_stream;
55  refCount = rhs.refCount;
56  rhs.nullify();
57  }
58  return *this;
59  }
60 
61  void create() {
62  if (refCount == nullptr) {
63  refCount = new int;
64  (*refCount) = 1;
65  if constexpr (streams_enabled) cudaStreamCreate( &my_stream );
66  }
67  }
68 
69  void destroy() {
70  if (refCount != nullptr) {
71  (*refCount)--;
72  if ( (*refCount) == 0 ) {
73  if constexpr (streams_enabled) cudaStreamDestroy( my_stream );
74  delete refCount;
75  nullify();
76  }
77  }
78  }
79 
80  cudaStream_t get_real_stream() { return my_stream; }
81  bool operator==(Stream stream) const { return my_stream == stream.get_real_stream(); }
82  inline void wait_on_event(Event event);
83  bool is_default_stream() { return my_stream == 0; }
84  bool completed() { return cudaStreamQuery( my_stream ) == cudaSuccess; }
85  void fence() { if(!completed()) cudaStreamSynchronize(my_stream); }
86  };
87 
88 
89  class Event {
90  protected:
91  cudaEvent_t my_event;
92  int * refCount; // Pointer shared by multiple copies of this Array to keep track of allcation / free
93 
94  void nullify() { my_event = 0; refCount = nullptr; }
95 
96  public:
97 
98  Event() { nullify(); }
99  ~Event() { destroy(); }
100 
101  Event(Event const &rhs) {
102  my_event = rhs.my_event;
103  refCount = rhs.refCount;
104  if (refCount != nullptr) (*refCount)++;
105  }
106  Event(Event &&rhs) {
107  my_event = rhs.my_event;
108  refCount = rhs.refCount;
109  rhs.nullify();
110  }
111  Event & operator=(Event const &rhs) {
112  if (this != &rhs) {
113  destroy();
114  my_event = rhs.my_event;
115  refCount = rhs.refCount;
116  if (refCount != nullptr) (*refCount)++;
117  }
118  return *this;
119  }
120  Event & operator=(Event &&rhs) {
121  if (this != &rhs) {
122  destroy();
123  my_event = rhs.my_event;
124  refCount = rhs.refCount;
125  rhs.nullify();
126  }
127  return *this;
128  }
129 
130  void create() {
131  if (refCount == nullptr) {
132  refCount = new int;
133  (*refCount) = 1;
134  cudaEventCreateWithFlags( &my_event, cudaEventDisableTiming );
135  }
136  }
137 
138  void destroy() {
139  if (refCount != nullptr) {
140  (*refCount)--;
141  if ( (*refCount) == 0 ) { cudaEventDestroy( my_event ); delete refCount; nullify(); }
142  }
143  }
144 
145  inline void record(Stream stream);
146  cudaEvent_t get_real_event() { return my_event; }
147  bool operator==(Event event) const { return my_event == event.get_real_event(); }
148  bool completed() { return cudaEventQuery( my_event ) == cudaSuccess; }
149  void fence() { if(!completed()) cudaEventSynchronize(my_event); }
150  };
151 
152 
153  inline void Event::record(Stream stream) {
154  create();
155  cudaEventRecord( my_event , stream.get_real_stream() );
156  }
157 
158 
159  inline void Stream::wait_on_event(Event event) {
160  cudaStreamWaitEvent( my_stream , event.get_real_event() , 0 );
161  }
162 
163  #elif defined(YAKL_ARCH_HIP)
164 
165  class Stream;
166  class Event;
167 
168  class Stream {
169  protected:
170  hipStream_t my_stream;
171  int * refCount; // Pointer shared by multiple copies of this Array to keep track of allcation / free
172 
173  void nullify() { my_stream = 0; refCount = nullptr; }
174 
175  public:
176 
177  Stream() { nullify(); }
178  Stream(hipStream_t hip_stream) { nullify(); my_stream = hip_stream; }
179  ~Stream() { destroy(); }
180 
181  Stream(Stream const &rhs) {
182  my_stream = rhs.my_stream;
183  refCount = rhs.refCount;
184  if (refCount != nullptr) (*refCount)++;
185  }
186  Stream(Stream &&rhs) {
187  my_stream = rhs.my_stream;
188  refCount = rhs.refCount;
189  rhs.nullify();
190  }
191  Stream & operator=(Stream const &rhs) {
192  if (this != &rhs) {
193  destroy();
194  my_stream = rhs.my_stream;
195  refCount = rhs.refCount;
196  if (refCount != nullptr) (*refCount)++;
197  }
198  return *this;
199  }
200  Stream & operator=(Stream &&rhs) {
201  if (this != &rhs) {
202  destroy();
203  my_stream = rhs.my_stream;
204  refCount = rhs.refCount;
205  rhs.nullify();
206  }
207  return *this;
208  }
209 
210  void create() {
211  if (refCount == nullptr) {
212  refCount = new int;
213  (*refCount) = 1;
214  if constexpr (streams_enabled) hipStreamCreateWithFlags( &my_stream, hipStreamNonBlocking );
215  }
216  }
217 
218  void destroy() {
219  if (refCount != nullptr) {
220  (*refCount)--;
221  if ( (*refCount) == 0 ) {
222  if constexpr (streams_enabled) hipStreamDestroy( my_stream );
223  delete refCount;
224  nullify();
225  }
226  }
227  }
228 
229  hipStream_t get_real_stream() { return my_stream; }
230  bool operator==(Stream stream) const { return my_stream == stream.get_real_stream(); }
231  inline void wait_on_event(Event event);
232  bool is_default_stream() { return my_stream == 0; }
233  bool completed() { return hipStreamQuery( my_stream ) == hipSuccess; }
234  void fence() { if(!completed()) hipStreamSynchronize(my_stream); }
235  };
236 
237 
238  class Event {
239  protected:
240  hipEvent_t my_event;
241  int * refCount; // Pointer shared by multiple copies of this Array to keep track of allcation / free
242 
243  void nullify() { my_event = 0; refCount = nullptr; }
244 
245  public:
246 
247  Event() { nullify(); }
248  ~Event() { destroy(); }
249 
250  Event(Event const &rhs) {
251  my_event = rhs.my_event;
252  refCount = rhs.refCount;
253  if (refCount != nullptr) (*refCount)++;
254  }
255  Event(Event &&rhs) {
256  my_event = rhs.my_event;
257  refCount = rhs.refCount;
258  rhs.nullify();
259  }
260  Event & operator=(Event const &rhs) {
261  if (this != &rhs) {
262  destroy();
263  my_event = rhs.my_event;
264  refCount = rhs.refCount;
265  if (refCount != nullptr) (*refCount)++;
266  }
267  return *this;
268  }
269  Event & operator=(Event &&rhs) {
270  if (this != &rhs) {
271  destroy();
272  my_event = rhs.my_event;
273  refCount = rhs.refCount;
274  rhs.nullify();
275  }
276  return *this;
277  }
278 
279  void create() {
280  if (refCount == nullptr) {
281  refCount = new int;
282  (*refCount) = 1;
283  hipEventCreateWithFlags( &my_event, hipEventDisableTiming );
284  }
285  }
286 
287  void destroy() {
288  if (refCount != nullptr) {
289  (*refCount)--;
290  if ( (*refCount) == 0 ) { hipEventDestroy( my_event ); delete refCount; nullify(); }
291  }
292  }
293 
294  inline void record(Stream stream);
295  hipEvent_t get_real_event() { return my_event; }
296  bool operator==(Event event) const { return my_event == event.get_real_event(); }
297  bool completed() { return hipEventQuery( my_event ) == hipSuccess; }
298  void fence() { if(!completed()) hipEventSynchronize(my_event); }
299  };
300 
301 
302  inline void Event::record(Stream stream) {
303  create();
304  hipEventRecord( my_event , stream.get_real_stream() );
305  }
306 
307 
308  inline void Stream::wait_on_event(Event event) {
309  hipStreamWaitEvent( my_stream , event.get_real_event() , 0 );
310  }
311 
312  #elif defined(YAKL_ARCH_SYCL)
313 
314  class Stream;
315  class Event;
316 
317  class Stream {
318  protected:
319  std::shared_ptr<sycl::queue> my_stream{nullptr};
320 
321  public:
322 
323  Stream() { }
324  Stream(sycl::queue &sycl_queue) { my_stream = std::make_shared<sycl::queue>(sycl_queue); }
325 
326  Stream( const Stream& ) = default;
327  Stream( Stream&& ) noexcept = default;
328  Stream& operator=( const Stream& ) = default;
329  Stream& operator=( Stream&& ) noexcept = default;
330 
331  void create() {
332  // Ensure these multi-streams use the same device & context as default-stream
333  if constexpr (streams_enabled) {
334  my_stream = std::make_shared<sycl::queue>( sycl_default_stream().get_context() ,
335  sycl_default_stream().get_device() ,
336  asyncHandler ,
337  sycl::property_list{sycl::property::queue::in_order{}} );
338  }
339  }
340 
341  sycl::queue & get_real_stream() const { return (my_stream != nullptr) ? *my_stream : sycl_default_stream(); }
342  bool operator==(Stream stream) const { return get_real_stream() == stream.get_real_stream(); }
343  inline void wait_on_event(Event event);
344  bool is_default_stream() const { return get_real_stream() == sycl_default_stream(); }
345  bool completed() {
346  /* macro SYCL_EXT_ONEAPI_QUEUE_EMPTY is defined by the supported compilers */
347  #if defined(SYCL_EXT_ONEAPI_QUEUE_EMPTY)
348  return get_real_stream().ext_oneapi_empty();
349  #else
350  return false;
351  #endif
352  }
353  void fence() { if(!completed()) get_real_stream().wait(); }
354  };
355 
356 
357  class Event {
358  protected:
359  sycl::event my_event;
360 
361  public:
362 
363  Event() { }
364  ~Event() { }
365 
366  Event(Event const &rhs) { my_event = rhs.my_event; }
367  Event(Event &&rhs) { my_event = rhs.my_event; }
368  Event & operator=(Event const &rhs) { if (this != &rhs) { my_event = rhs.my_event; }; return *this; }
369  Event & operator=(Event &&rhs) { if (this != &rhs) { my_event = rhs.my_event; }; return *this; }
370 
371  void create() { }
372  void destroy() { }
373 
374  inline void record(Stream stream);
375  sycl::event & get_real_event() { return my_event; }
376  bool operator==(Event event) const { return my_event == event.get_real_event(); }
377  bool completed() { return my_event.get_info<sycl::info::event::command_execution_status>() == sycl::info::event_command_status::complete; }
378  void fence() { if(!completed()) my_event.wait(); }
379  };
380 
381 
382  inline void Event::record(Stream stream) { my_event = stream.get_real_stream().ext_oneapi_submit_barrier(); }
383 
384  inline void Stream::wait_on_event(Event event) { this->get_real_stream().ext_oneapi_submit_barrier({event.get_real_event()}); }
385 
386 
387  #else
388 
389  struct Stream;
390  struct Event;
391 
394  struct Stream {
396  void create() { }
398  void destroy() { }
400  bool operator==(Stream stream) const { return true; }
402  inline void wait_on_event(Event event);
404  bool is_default_stream() { return true; }
406  bool completed() { return true; }
408  void fence() { }
409  };
410 
413  struct Event {
415  void create() { }
417  void destroy() { }
419  inline void record(Stream stream);
421  bool operator==(Event event) const { return true; }
423  bool completed() { return true; }
425  void fence() { }
426  };
427 
428  inline void Event::record(Stream stream) { }
429  inline void Stream::wait_on_event(Event event) { }
430 
431  #endif
432 
433 
435  inline Stream create_stream() { Stream stream; stream.create(); return stream; }
436 
438  inline Event record_event(Stream stream = Stream()) { Event event; event.create(); event.record(stream); return event; }
439 
440 
446  struct StreamList {
448  std::vector<Stream> *list;
449  std::mutex mtx_loc;
452  YAKL_EXECUTE_ON_HOST_ONLY( list = new std::vector<Stream>; )
453  }
455  YAKL_EXECUTE_ON_HOST_ONLY( delete list; )
456  }
458  void push_back(Stream stream) {
459  mtx_loc.lock();
460  list->push_back(stream);
461  mtx_loc.unlock();
462  }
464  int size() const { return list->size(); }
466  bool empty() const { return list->empty(); }
468  Stream operator[] (int i) { return (*list)[i]; }
470  std::vector<Stream> get_all_streams() const { return *list; }
471  };
472 
473 }
yakl::Event
Implements the functionality of an event within a stream. The event is not created until the Event::c...
Definition: YAKL_streams_events.h:413
yakl::Stream::fence
void fence()
Pause all CPU work until all existing work in this stream completes.
Definition: YAKL_streams_events.h:408
yakl::Event::fence
void fence()
Pause all CPU work until this event has completed.
Definition: YAKL_streams_events.h:425
yakl::Stream::create
void create()
Create the stream.
Definition: YAKL_streams_events.h:396
yakl::Stream
Implements the functionality of a stream for parallel kernel execution. If the Stream::create() metho...
Definition: YAKL_streams_events.h:394
yakl::Stream::operator==
bool operator==(Stream stream) const
Determine if this stream is the same as the passed stream.
Definition: YAKL_streams_events.h:400
__YAKL_NAMESPACE_WRAPPER_END__
#define __YAKL_NAMESPACE_WRAPPER_END__
Definition: YAKL.h:20
yakl::StreamList::get_all_streams
std::vector< Stream > get_all_streams() const
Get a std::vector of the streams in the list.
Definition: YAKL_streams_events.h:470
yakl::streams_enabled
constexpr bool streams_enabled
If the CPP Macro YAKL_ENABLE_STREAMS is defined, then this bool is set to true
Definition: YAKL_streams_events.h:11
yakl::Event::destroy
void destroy()
Destroy the event.
Definition: YAKL_streams_events.h:417
__YAKL_NAMESPACE_WRAPPER_BEGIN__
#define __YAKL_NAMESPACE_WRAPPER_BEGIN__
Definition: YAKL.h:19
yakl::StreamList::mtx_loc
std::mutex mtx_loc
Definition: YAKL_streams_events.h:449
yakl::Stream::wait_on_event
void wait_on_event(Event event)
Tell the stream to wait until the passed event completes before continuing work in the stream.
Definition: YAKL_streams_events.h:429
YAKL_INLINE
#define YAKL_INLINE
Used to decorate functions called from kernels (parallel_for and parallel_outer) or from CPU function...
Definition: YAKL_defines.h:140
yakl::Event::operator==
bool operator==(Event event) const
Determine if this event is the same as the passed event.
Definition: YAKL_streams_events.h:421
yakl::StreamList
Implements a list of Stream objects. Needs to store a pointer to avoid construction on the device sin...
Definition: YAKL_streams_events.h:446
yakl::Event::create
void create()
Create the event.
Definition: YAKL_streams_events.h:415
YAKL_EXECUTE_ON_HOST_ONLY
#define YAKL_EXECUTE_ON_HOST_ONLY(...)
[NOT COMMONLY USED] Macro function used to determine if the current code is compiling for the host.
Definition: YAKL_defines.h:153
yakl::StreamList::push_back
void push_back(Stream stream)
Add a stream to the end of the list.
Definition: YAKL_streams_events.h:458
yakl::record_event
Event record_event(Stream stream=Stream())
Create, record, and return an event using the given stream.
Definition: YAKL_streams_events.h:438
yakl::Event::completed
bool completed()
Determine if this event has completed.
Definition: YAKL_streams_events.h:423
yakl::create_stream
Stream create_stream()
Create and return a Stream object. It is guaranteed to not be the default stream.
Definition: YAKL_streams_events.h:435
yakl::Stream::is_default_stream
bool is_default_stream()
Determine whether this stream is the default stream.
Definition: YAKL_streams_events.h:404
yakl::Stream::destroy
void destroy()
Destroy the stream.
Definition: YAKL_streams_events.h:398
yakl
yakl::StreamList::operator[]
Stream operator[](int i)
Access the stream at the requested index.
Definition: YAKL_streams_events.h:468
yakl::Stream::completed
bool completed()
Determine if the task enqueued to the stream has completed.
Definition: YAKL_streams_events.h:406
yakl::StreamList::empty
bool empty() const
Determine whether the list is empty (has no streams)
Definition: YAKL_streams_events.h:466
yakl::StreamList::size
int size() const
Get the number of streams in the list.
Definition: YAKL_streams_events.h:464
yakl::StreamList::StreamList
YAKL_INLINE StreamList()
Create an empty stream list.
Definition: YAKL_streams_events.h:451
yakl::StreamList::~StreamList
YAKL_INLINE ~StreamList()
Definition: YAKL_streams_events.h:454
yakl::Event::record
void record(Stream stream)
Record an event in the passed stream.
Definition: YAKL_streams_events.h:428