pitrou commented on code in PR #34392: URL: https://github.com/apache/arrow/pull/34392#discussion_r1196711606
########## cpp/src/arrow/acero/asof_join_node.cc: ########## @@ -239,24 +344,45 @@ struct MemoStore { // the time of the current entry, defaulting to 0. // when entries with a time less than T are removed, the current time is updated to the // time of the next (by-time) and now-current entry or to T if no such entry exists. - OnType current_time_; + std::atomic<OnType> current_time_; // current entry per key std::unordered_map<ByType, Entry> entries_; // future entries per key std::unordered_map<ByType, std::queue<Entry>> future_entries_; // current and future (distinct) times of existing entries std::deque<OnType> times_; +#ifndef NDEBUG + // Owning node + AsofJoinNode* node_; + // Index of owning input + size_t index_; +#endif void swap(MemoStore& memo) { +#ifndef NDEBUG + std::swap(node_, memo.node_); + std::swap(index_, memo.index_); +#endif std::swap(no_future_, memo.no_future_); - std::swap(current_time_, memo.current_time_); + current_time_ = memo.current_time_.exchange(static_cast<OnType>(current_time_)); entries_.swap(memo.entries_); future_entries_.swap(memo.future_entries_); times_.swap(memo.times_); } + bool UpdateTime(OnType ts) { + OnType prev_time = current_time_; + bool update = prev_time < ts; + while (prev_time < ts && current_time_.compare_exchange_weak(prev_time, ts)) { + // intentionally empty - standard CAS loop Review Comment: I'm not sure I understand. A standard CAS loop would retry when the CAS operation fails, right? Here, it retries when the CAS succeeds... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org