C++ Channel
Loading...
Searching...
No Matches
channel.hpp
1// Copyright (C) 2020-2025 Andrei Avram
2
3#ifndef MSD_CHANNEL_HPP_
4#define MSD_CHANNEL_HPP_
5
6#include <atomic>
7#include <condition_variable>
8#include <cstdlib>
9#include <mutex>
10#include <queue>
11#include <stdexcept>
12#include <type_traits>
13
14#include "blocking_iterator.hpp"
15
16namespace msd {
17
18#if (__cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L))
19#define NODISCARD [[nodiscard]]
20#else
21#define NODISCARD
22#endif
23
27class closed_channel : public std::runtime_error {
28 public:
34 explicit closed_channel(const char* msg) : std::runtime_error{msg} {}
35};
36
44template <typename T>
45class channel {
46 public:
50 using value_type = T;
51
56
60 using size_type = std::size_t;
61
65 constexpr channel() = default;
66
72 explicit constexpr channel(const size_type capacity) : cap_{capacity} {}
73
79 template <typename Type>
80 friend channel<typename std::decay<Type>::type>& operator<<(channel<typename std::decay<Type>::type>&, Type&&);
81
87 template <typename Type>
89
95 NODISCARD size_type constexpr size() const noexcept { return size_; }
96
103 NODISCARD bool constexpr empty() const noexcept { return size_ == 0; }
104
108 void close() noexcept
109 {
110 {
111 std::unique_lock<std::mutex> lock{mtx_};
112 is_closed_.store(true, std::memory_order_seq_cst);
113 }
114 cnd_.notify_all();
115 }
116
123 NODISCARD bool closed() const noexcept { return is_closed_.load(std::memory_order_seq_cst); }
124
130 iterator begin() noexcept { return blocking_iterator<channel<T>>{*this}; }
131
137 iterator end() noexcept { return blocking_iterator<channel<T>>{*this}; }
138
142 channel(const channel&) = delete;
143 channel& operator=(const channel&) = delete;
144 channel(channel&&) = delete;
145 channel& operator=(channel&&) = delete;
146 virtual ~channel() = default;
147
148 private:
149 const size_type cap_{0};
150 std::queue<T> queue_;
151 std::atomic<std::size_t> size_{0};
152 std::mutex mtx_;
153 std::condition_variable cnd_;
154 std::atomic<bool> is_closed_{false};
155
156 void waitBeforeRead(std::unique_lock<std::mutex>& lock)
157 {
158 cnd_.wait(lock, [this]() { return !empty() || closed(); });
159 };
160
161 void waitBeforeWrite(std::unique_lock<std::mutex>& lock)
162 {
163 if (cap_ > 0 && size_ == cap_) {
164 cnd_.wait(lock, [this]() { return size_ < cap_; });
165 }
166 }
167
168 friend class blocking_iterator<channel>;
169};
170
171template <typename T>
172channel<typename std::decay<T>::type>& operator<<(channel<typename std::decay<T>::type>& chan, T&& value)
173{
174 {
175 std::unique_lock<std::mutex> lock{chan.mtx_};
176 chan.waitBeforeWrite(lock);
177
178 if (chan.closed()) {
179 throw closed_channel{"cannot write on closed channel"};
180 }
181
182 chan.queue_.push(std::forward<T>(value));
183 ++chan.size_;
184 }
185
186 chan.cnd_.notify_one();
187
188 return chan;
189}
190
191template <typename T>
192channel<T>& operator>>(channel<T>& chan, T& out)
193{
194 {
195 std::unique_lock<std::mutex> lock{chan.mtx_};
196 chan.waitBeforeRead(lock);
197
198 if (chan.closed() && chan.empty()) {
199 return chan;
200 }
201
202 if (!chan.empty()) {
203 out = std::move(chan.queue_.front());
204 chan.queue_.pop();
205 --chan.size_;
206 }
207 }
208
209 chan.cnd_.notify_one();
210
211 return chan;
212}
213
214} // namespace msd
215
216#endif // MSD_CHANNEL_HPP_
An iterator that block the current thread, waiting to fetch elements from the channel.
Definition blocking_iterator.hpp:20
Thread-safe container for sharing data between threads.
Definition channel.hpp:45
T value_type
The type of elements stored in the channel.
Definition channel.hpp:50
void close() noexcept
Closes the channel.
Definition channel.hpp:108
friend channel< Type > & operator>>(channel< Type > &, Type &)
Pops an element from the channel.
iterator end() noexcept
Returns an iterator representing the end of the channel.
Definition channel.hpp:137
iterator begin() noexcept
Returns an iterator to the beginning of the channel.
Definition channel.hpp:130
constexpr channel()=default
Creates an unbuffered channel.
NODISCARD bool closed() const noexcept
Checks if the channel has been closed.
Definition channel.hpp:123
NODISCARD bool constexpr empty() const noexcept
Checks if the channel is empty.
Definition channel.hpp:103
channel(const channel &)=delete
std::size_t size_type
The type used to represent sizes and counts.
Definition channel.hpp:60
friend channel< typename std::decay< Type >::type > & operator<<(channel< typename std::decay< Type >::type > &, Type &&)
Pushes an element into the channel.
constexpr channel(const size_type capacity)
Creates a buffered channel.
Definition channel.hpp:72
NODISCARD size_type constexpr size() const noexcept
Returns the current size of the channel.
Definition channel.hpp:95
Exception thrown if trying to write on closed channel.
Definition channel.hpp:27
closed_channel(const char *msg)
Constructs the exception with an error message.
Definition channel.hpp:34