iChauster commented on code in PR #13771:
URL: https://github.com/apache/arrow/pull/13771#discussion_r937799114
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -46,52 +46,66 @@ typedef uint64_t row_index_t;
typedef int col_index_t;
/**
- * Simple implementation for an unbound concurrent queue
+ * Simple implementation for an bounded concurrent queue
*/
template <class T>
-class ConcurrentQueue {
+class ConcurrentBoundedQueue {
+ size_t _remaining;
+ std::vector<T> _buffer;
+ mutable std::mutex _gate;
+ std::condition_variable _not_full;
+ std::condition_variable _not_empty;
+
+ size_t _next_push = 0;
+ size_t _next_pop = 0;
+
public:
+ explicit ConcurrentBoundedQueue(size_t capacity)
+ : _remaining(capacity), _buffer(capacity) {}
+
+ // Push new value to queue, waiting for capacity indefinitely.
+ void Push(const T& t) {
+ std::unique_lock<std::mutex> lock(_gate);
+ _not_full.wait(lock, [&] { return _remaining > 0; });
+ _buffer[_next_push++] = t;
+ _next_push %= _buffer.size();
+ --_remaining;
+ _not_empty.notify_one();
+ }
+ // Get oldest value from queue, or wait indefinitely for it.
T Pop() {
- std::unique_lock<std::mutex> lock(mutex_);
- cond_.wait(lock, [&] { return !queue_.empty(); });
- auto item = queue_.front();
- queue_.pop();
- return item;
- }
-
- void Push(const T& item) {
- std::unique_lock<std::mutex> lock(mutex_);
- queue_.push(item);
- cond_.notify_one();
+ std::unique_lock<std::mutex> lock(_gate);
+ _not_empty.wait(lock, [&] { return _remaining < _buffer.size(); });
+ T r = _buffer[_next_pop++];
+ _next_pop %= _buffer.size();
+ ++_remaining;
+ _not_full.notify_one();
+ return r;
}
-
+ // Try to pop the oldest value from the queue (or return nullopt if none)
util::optional<T> TryPop() {
- // Try to pop the oldest value from the queue (or return nullopt if none)
- std::unique_lock<std::mutex> lock(mutex_);
- if (queue_.empty()) {
- return util::nullopt;
- } else {
- auto item = queue_.front();
- queue_.pop();
- return item;
- }
+ std::unique_lock<std::mutex> lock(_gate);
+ if (_remaining == _buffer.size()) return util::nullopt;
+ T r = _buffer[_next_pop++];
+ _next_pop %= _buffer.size();
+ ++_remaining;
+ _not_full.notify_one();
+ return r;
}
+ // Test whether empty
bool Empty() const {
- std::unique_lock<std::mutex> lock(mutex_);
- return queue_.empty();
+ std::unique_lock<std::mutex> lock(_gate);
+ return _remaining == _buffer.size();
}
+ int Size() const { return _buffer.size(); }
Review Comment:
Hey Weston, I'm not sure how to run TSAN locally with `ninja`, is there some
documentation on how to do this?
In any case, I don't actually think we use this method anywhere -- I'm going
to remove this for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]