C++ Channel
Loading...
Searching...
No Matches
channel.hpp
Go to the documentation of this file.
1// Copyright (C) 2020-2025 Andrei Avram
2
3#ifndef MSD_CHANNEL_CHANNEL_HPP_
4#define MSD_CHANNEL_CHANNEL_HPP_
5
7#include "nodiscard.hpp"
8#include "storage.hpp"
9
10#include <condition_variable>
11#include <cstdlib>
12#include <mutex>
13#include <stdexcept>
14#include <type_traits>
15
18namespace msd {
19
23class closed_channel : public std::runtime_error {
24 public:
30 explicit closed_channel(const char* msg) : std::runtime_error{msg} {}
31};
32
39template <typename T>
41
54template <typename T>
59 static constexpr bool value = std::is_default_constructible<T>::value && std::is_move_constructible<T>::value &&
60 std::is_move_assignable<T>::value && std::is_destructible<T>::value;
61};
62
66template <typename, typename = void>
67struct is_static_storage : std::false_type {};
68
74template <typename Storage>
75struct is_static_storage<Storage, decltype((void)Storage::capacity, void())> : std::true_type {};
76
86template <typename T, typename Storage = default_storage<T>>
87class channel {
88 public:
89 static_assert(is_supported_type<T>::value, "Type T does not meet all requirements.");
90
94 using value_type = T;
95
100
104 using size_type = std::size_t;
105
111 template <typename S = Storage, typename std::enable_if<is_static_storage<S>::value, int>::type = 0>
112 constexpr channel() : capacity_{Storage::capacity}
113 {
114 }
115
119 template <typename S = Storage, typename std::enable_if<!is_static_storage<S>::value, int>::type = 0>
120 constexpr channel() : storage_{0}
121 {
122 }
123
129 template <typename S = Storage, typename std::enable_if<!is_static_storage<S>::value, int>::type = 0>
130 explicit constexpr channel(const size_type capacity) : storage_{capacity}, capacity_{capacity}
131 {
132 }
133
142 template <typename Type, typename Store>
144 channel<typename std::decay<Type>::type, Store>& chan, Type&& value);
145
153 template <typename Type, typename Store>
155
164 template <typename Type>
165 bool write(Type&& value)
166 {
167 {
168 std::unique_lock<std::mutex> lock{mtx_};
169 wait_before_write(lock);
170
171 if (is_closed_) {
172 return false;
173 }
174
175 storage_.push_back(std::forward<Type>(value));
176 }
177
178 cnd_.notify_one();
179
180 return true;
181 }
182
190 bool read(T& out)
191 {
192 {
193 std::unique_lock<std::mutex> lock{mtx_};
194 wait_before_read(lock);
195
196 if (storage_.size() == 0 && is_closed_) {
197 return false;
198 }
199
200 storage_.pop_front(out);
201 }
202
203 cnd_.notify_one();
204
205 return true;
206 }
207
213 NODISCARD size_type size() const noexcept
214 {
215 std::unique_lock<std::mutex> lock{mtx_};
216 return storage_.size();
217 }
218
225 NODISCARD bool empty() const noexcept
226 {
227 std::unique_lock<std::mutex> lock{mtx_};
228 return storage_.size() == 0;
229 }
230
234 void close() noexcept
235 {
236 {
237 std::unique_lock<std::mutex> lock{mtx_};
238 is_closed_ = true;
239 }
240 cnd_.notify_all();
241 }
242
249 NODISCARD bool closed() const noexcept
250 {
251 std::unique_lock<std::mutex> lock{mtx_};
252 return is_closed_;
253 }
254
261 NODISCARD bool drained() noexcept
262 {
263 std::unique_lock<std::mutex> lock{mtx_};
264 return storage_.size() == 0 && is_closed_;
265 }
266
273
279 iterator end() noexcept { return blocking_iterator<channel<T, Storage>>{*this, true}; }
280
281 channel(const channel&) = delete;
282 channel& operator=(const channel&) = delete;
283 channel(channel&&) = delete;
284 channel& operator=(channel&&) = delete;
285 virtual ~channel() = default;
286
287 private:
288 Storage storage_;
289 std::condition_variable cnd_;
290 mutable std::mutex mtx_;
291 std::size_t capacity_{};
292 bool is_closed_{};
293
294 void wait_before_read(std::unique_lock<std::mutex>& lock)
295 {
296 cnd_.wait(lock, [this]() { return storage_.size() > 0 || is_closed_; });
297 };
298
299 void wait_before_write(std::unique_lock<std::mutex>& lock)
300 {
301 if (capacity_ > 0) {
302 cnd_.wait(lock, [this]() { return storage_.size() < capacity_; });
303 }
304 }
305};
306
310template <typename T, typename Storage>
311channel<typename std::decay<T>::type, Storage>& operator<<(channel<typename std::decay<T>::type, Storage>& chan,
312 T&& value)
313{
314 if (!chan.write(std::forward<T>(value))) {
315 throw closed_channel{"cannot write on closed channel"};
316 }
317
318 return chan;
319}
320
324template <typename T, typename Storage>
331
332} // namespace msd
333
334#endif // MSD_CHANNEL_CHANNEL_HPP_
channel< typename std::decay< T >::type, Storage > & operator<<(channel< typename std::decay< T >::type, Storage > &chan, T &&value)
Pushes an element into the channel.
Definition channel.hpp:311
channel< T, Storage > & operator>>(channel< T, Storage > &chan, T &out)
Pops an element from the channel.
Definition channel.hpp:325
An iterator that blocks the current thread, waiting to fetch elements from the channel.
Definition blocking_iterator.hpp:21
Thread-safe container for sharing data between threads.
Definition channel.hpp:87
friend channel< typename std::decay< Type >::type, Store > & operator<<(channel< typename std::decay< Type >::type, Store > &chan, Type &&value)
Pushes an element into the channel.
NODISCARD bool drained() noexcept
Checks if the channel has been closed and is empty.
Definition channel.hpp:261
constexpr channel(const size_type capacity)
Creates a buffered channel if Storage is not static (does not have static capacity member).
Definition channel.hpp:130
constexpr channel()
Creates a buffered channel if Storage is static (has static capacity member)
Definition channel.hpp:112
NODISCARD bool closed() const noexcept
Checks if the channel has been closed.
Definition channel.hpp:249
T value_type
The type of elements stored in the channel.
Definition channel.hpp:94
bool write(Type &&value)
Pushes an element into the channel.
Definition channel.hpp:165
std::size_t size_type
The type used to represent sizes and counts.
Definition channel.hpp:104
NODISCARD bool empty() const noexcept
Checks if the channel is empty.
Definition channel.hpp:225
void close() noexcept
Closes the channel, no longer accepting new elements.
Definition channel.hpp:234
friend channel< Type, Store > & operator>>(channel< Type, Store > &chan, Type &out)
Pops an element from the channel.
bool read(T &out)
Pops an element from the channel.
Definition channel.hpp:190
iterator end() noexcept
Returns an iterator representing the end of the channel.
Definition channel.hpp:279
NODISCARD size_type size() const noexcept
Returns the current size of the channel.
Definition channel.hpp:213
iterator begin() noexcept
Returns an iterator to the beginning of the channel.
Definition channel.hpp:272
Exception thrown if trying to write on closed channel.
Definition channel.hpp:23
closed_channel(const char *msg)
Constructs the exception with an error message.
Definition channel.hpp:30
A FIFO queue storage using std::queue.
Definition storage.hpp:23
Trait to check if a storage type has a static capacity member.
Definition channel.hpp:67
Trait to check if a type is supported by msd::channel.
Definition channel.hpp:55
static constexpr bool value
Indicates if the type meets all channel requirements.
Definition channel.hpp:59