[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp
Sahil Takiar has posted comments on this change. ( http://gerrit.cloudera.org:8080/13883 ) Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp .. Patch Set 3: (27 comments) http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG@9 PS3, Line 9: a blocking and : non-blocking implementation > It may help to spell out the exact difference in the two categories. Updated the commit message now that the changes to add the generic RowBatchQueue aren't going in this patch. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h File be/src/exec/buffered-plan-root-sink.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29 PS3, Line 29: The blocking behavior follows : /// the same semantics as BlockingPlanRootSink. > Yes, I guess at some point soon, we may want to consolidate on the implemen Done http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42 PS3, Line 42: RowBatchQueue* batch_queue) > Thanks for the link. I can see the advantage being that different users of Agreed. Changed so that BufferedPlanRootSink owns the given batch_queue. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@65 PS3, Line 65: boost::mutex > std::mutex. IIUC, the general direction is to move away from boost library. It seems that our ConditionVariables (util/condition-variable.h) are tied to boost::mutex-es. So I can't seem to use std::mutex here with some more re-factoring. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc File be/src/exec/buffered-plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc@38 PS3, Line 38: output_batch->AcquireState(batch); > This is not making a copy. This is actually transferring the ownership of t Replaced this with DeepCopyTo per Tim's suggestion. Note this will probably change when we move to the BufferedTupleStream implementation as a BTS does its own copy of the given batch. The call to DeepCopyTo is necessary when using a std::deque because the FragmentInstanceState owns the given RowBatch. http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc File be/src/exec/buffered-plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc@38 PS7, Line 38: output_batch->AcquireState(batch); > AcquireState isn't safe to use here - it doesn't copy into fresh memory, ju Thanks for the info Tim. Replaced this DeepCopyTo. I think BufferedTupleStream is already doing something like DeepCopyTo, correct? Currently, the copy is necessary because FragmentInstanceState owns the given RowBatch. However, once we migrate to the BTS queue, the call to DeepCopyTo shouldn't be necessary, because BTS already does a copy, right? http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc File be/src/exec/data-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc@114 PS3, Line 114: new NonBlockingRowBatchQueue(10)) > Who owns this object ? It doesn't look like it's being added to pool. We us Fixed so that BufferedPlanRootSink owns and manages the lifecycle of the queue; the queue is managed be a unique_ptr. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h File be/src/exec/plan-root-sink.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h@61 PS3, Line 61: = 0; > Is it still valid in this patch ? Given PlanRootSink::Send() is not pure vi Per Tim's suggestion. I moved the PlanRootSink::Send implementation to a new helper method call ValidateRowBatch. http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc File be/src/exec/plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc@64 PS7, Line 64: Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { > I'd find it clearer if this was a helper function instead of the default im Makes sense, moved this to a new method called ValidateRowBatch. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc File be/src/exec/scan-node.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc@315 PS3, Line 315: Shutdown(); > Close(); Now that I removed the generic RowBatchQueue interface from this patch, BlockingRowBatchQueue has a Shutdown() method. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h File be/src/runtime/blocking-row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@54 PS3, Line 54: public BlockingQueue, RowBatchBytesFn>, : public RowBatchQueue { > Not a big fan of multi-inheritance. Now that I remo
[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp
Tim Armstrong has posted comments on this change. ( http://gerrit.cloudera.org:8080/13883 ) Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp .. Patch Set 7: (2 comments) Didn't do a full review, but wanted to point out a memory-safety issue because of the arcane memory transfer rules around RowBatches. http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc File be/src/exec/buffered-plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc@38 PS7, Line 38: output_batch->AcquireState(batch); AcquireState isn't safe to use here - it doesn't copy into fresh memory, just does a shallow move of the batches state. You don't know what the batch might have pointers to. There are two potential problems * If the FLUSH_RESOURCES flag is set, it's necessary for correctness to copy this batch and all preceding batches, since the memory they reference may be going away * The batches may be keeping alive a lot more memory than they're actually references, e.g. if a lot of rows got filtered out by predicates NljBuilder correctly handled both of these problems, but I don't think it's worth the complexity of trying to do that (you'd, at minimum, need to iterate through the queue and deep copy all the batches). This is all a consequence of Impala very aggressively trying to avoid copying memory. I'd recommend using DeepCopyTo() like here: https://github.com/apache/impala/blob/master/be/src/exec/nested-loop-join-builder.cc#L97 There are probably relatively few common query patterns that would exercise this since most common plans have an exchange feeding into PlanRootSink. A grouping aggregation feeding directly into PlanRootSink might be able to reproduce it (you'd have to set num_nodes=1 to get a non-distributed plan). http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc File be/src/exec/plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc@64 PS7, Line 64: Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { I'd find it clearer if this was a helper function instead of the default implementation of Send() - that would make it clear that it wasn't a (possibly) complete implementation of Send(), which is what I'd generally expect for a default implementation. -- To view, visit http://gerrit.cloudera.org:8080/13883 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be Gerrit-Change-Number: 13883 Gerrit-PatchSet: 7 Gerrit-Owner: Sahil Takiar Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sahil Takiar Gerrit-Reviewer: Tim Armstrong Gerrit-Comment-Date: Wed, 24 Jul 2019 17:30:23 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13883 ) Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp .. Patch Set 6: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/3970/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13883 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be Gerrit-Change-Number: 13883 Gerrit-PatchSet: 6 Gerrit-Owner: Sahil Takiar Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sahil Takiar Gerrit-Reviewer: Tim Armstrong Gerrit-Comment-Date: Wed, 24 Jul 2019 03:53:32 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp
Hello Michael Ho, Tim Armstrong, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/13883 to look at the new patch set (#7). Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp .. IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp Introduces a generic RowBatchQueue interface with a blocking and non-blocking implementation. The blocking implementation is a re-factored version of the current RowBatchQueue. The non-blocking implementation is simple wrapper around std::queue. The current RowBatchQueue, which is used by the scanners, is renamed to BlockingRowBatchQueue and it is a subclass of the new RowBatchQueue interface. This patch stops short of completely abstracting all the details of the current RowBatchQueue and instead includes a few TODOs. NonBlockingRowBatchQueue has max capacity, after which calls to AddBatch will return false. Implements BufferedPlanRootSink using the new RowBatchQueue interface. Currently, the NonBlockingRowBatchQueue is injected into the BufferedPlanRootSink, however, the implementation of BufferedPlanRootSink is not tied to NonBlockingRowBatchQueue, although it does assume the RowBatchQueue is not thread safe. This allows a future patch to add a RowBatchQueue backed by a BufferedTupleStream without re-factoring BufferedPlanRootSink. BufferedPlanRootSink FlushFinal blocks until the consumer thread has processed all RowBatches. This ensures that the coordinator fragment stays alive until all results are fetched, but allows all other fragments to be shutdown immediately. Testing: * Running core tests * Updated tests/query_test/test_result_spooling.py Follow up work: * Add a stress test in test_result_spooling.py to validate the synchronization logic in BufferedPlanRootSink * Handle Send calls where num_results < batch->num_rows() * Add a direct write path in Send that directly writes a RowBatch to a QueryResultSet, if one is available and if the RowBatchQueue is empty * Implement a RowBatchQueue backed by a BufferedTupleStream * Re-factor the resource management logic to release all non-coordinator fragment resources Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be --- M be/src/exec/blocking-plan-root-sink.cc M be/src/exec/blocking-plan-root-sink.h M be/src/exec/buffered-plan-root-sink.cc M be/src/exec/buffered-plan-root-sink.h M be/src/exec/data-sink.cc M be/src/exec/hdfs-scan-node.cc M be/src/exec/kudu-scan-node.cc M be/src/exec/plan-root-sink.cc M be/src/exec/plan-root-sink.h M be/src/exec/scan-node.cc M be/src/exec/scan-node.h M be/src/exec/scanner-context.cc M be/src/runtime/CMakeLists.txt R be/src/runtime/blocking-row-batch-queue.cc A be/src/runtime/blocking-row-batch-queue.h A be/src/runtime/non-blocking-row-batch-queue.cc A be/src/runtime/non-blocking-row-batch-queue.h M be/src/runtime/row-batch-queue.h M be/src/util/blocking-queue.h M tests/query_test/test_result_spooling.py 20 files changed, 464 insertions(+), 97 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/83/13883/7 -- To view, visit http://gerrit.cloudera.org:8080/13883 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be Gerrit-Change-Number: 13883 Gerrit-PatchSet: 7 Gerrit-Owner: Sahil Takiar Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sahil Takiar Gerrit-Reviewer: Tim Armstrong