Michael Ho has posted comments on this change. Change subject: IMPALA-4026: Implement double-buffering for BlockingQueue ......................................................................
Patch Set 5: (10 comments) http://gerrit.cloudera.org:8080/#/c/4350/5/be/src/exec/hdfs-scan-node.cc File be/src/exec/hdfs-scan-node.cc: Line 79: row_batches_put_timer_ = runtime_profile()->AddCounter("QueuePutTime", TUnit::TIME_NS); > we usually do this in Open() or Prepare() (see other counters in e.g. HdfsS Done http://gerrit.cloudera.org:8080/#/c/4350/5/be/src/util/blocking-queue.h File be/src/util/blocking-queue.h: PS5, Line 75: NotitfyAll > NotifyAll(). Done PS5, Line 90: DCHECK > nit: DCHECK_NE DCHECK removed. PS5, Line 98: This may : // imply that some writers may be sleeping on a partially empty queue > Maybe "If this race occurs, a writer can stay blocked on a partially empty Done PS5, Line 99: Given the major : // use case is with HDFS scan node which has multiple producers and one consumer, it's : // expected that some producers can make progress. > Maybe more simply: "This should only occur when producers are faster than c Done Line 102: put_cv_.NotifyOne(); > is it worth explaining this race rather than fixing it? Doesn't pthreads o Not sure I understand the optimization you are referring to here. The race here is that a thread can call put_cv_.NotifyOne() while another thread just checks the queue's size but before it calls put_cv_.Wait(). AFAIK, the only way to avoid this race is to also grab the "put_lock_" in BlockingGet() which kind of defeats the purpose of the change. PS5, Line 137: GetSize > this is an unfortunate name. I read it to be the size of the "Get" list. M Done Line 171: // the queue's size could have changed once the lock is dropped. > how do you know the deque::size() method doesn't need the synchronization ( Definitely expensive as writer can now block reader even if "get_list_" is not empty. On x86, an aligned 64-bit read should be atomic. That said, it's a good point to that we cannot assume the implementation of dequeue::size(). Added an AtomicInt64 for the get_list's size to make sure all reads will be 32-bit consistent. Line 197: boost::scoped_ptr<std::deque<T>> put_list_; > why add this extra indirection? couldn't we just do deque::swap() directly Good point. Done. Also rearranged the class members a bit. http://gerrit.cloudera.org:8080/#/c/4350/5/be/src/util/condition-variable.h File be/src/util/condition-variable.h: PS5, Line 29: doesn't have any logic to deal with thread interruption > what's the implication of that? are signals not handled properly? The thread interruption feature has nothing to do with signal handling. It's a boost library feature which we don't use (at least for BlockingQueue). It's basically a way for one thread to interrupt another thread at well defined point in the code. https://www.justsoftwaresolutions.co.uk/threading/thread-interruption-in-boost-thread-library.html -- To view, visit http://gerrit.cloudera.org:8080/4350 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ib9f4cf351455efefb0f3bb791cf9bc82d1421d54 Gerrit-PatchSet: 5 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Alex Behm <alex.b...@cloudera.com> Gerrit-Reviewer: Chen Huang <paulhuan...@utexas.edu> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com> Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com> Gerrit-HasComments: Yes