[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp

2019-07-24 Thread Sahil Takiar (Code Review)
Sahil Takiar has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13883 )

Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and 
BufferedPRS imp
..


Patch Set 3:

(27 comments)

http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG@9
PS3, Line 9: a blocking and
   : non-blocking implementation
> It may help to spell out the exact difference in the two categories.
Updated the commit message now that the changes to add the generic 
RowBatchQueue aren't going in this patch.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h
File be/src/exec/buffered-plan-root-sink.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29
PS3, Line 29: The blocking behavior follows
: /// the same semantics as BlockingPlanRootSink.
> Yes, I guess at some point soon, we may want to consolidate on the implemen
Done


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42
PS3, Line 42: RowBatchQueue* batch_queue)
> Thanks for the link. I can see the advantage being that different users of
Agreed. Changed so that BufferedPlanRootSink owns the given batch_queue.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@65
PS3, Line 65: boost::mutex
> std::mutex. IIUC, the general direction is to move away from boost library.
It seems that our ConditionVariables (util/condition-variable.h) are tied to 
boost::mutex-es. So I can't seem to use std::mutex here with some more 
re-factoring.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc
File be/src/exec/buffered-plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc@38
PS3, Line 38: output_batch->AcquireState(batch);
> This is not making a copy. This is actually transferring the ownership of t
Replaced this with DeepCopyTo per Tim's suggestion. Note this will probably 
change when we move to the BufferedTupleStream implementation as a BTS does its 
own copy of the given batch.

The call to DeepCopyTo is necessary when using a std::deque because the 
FragmentInstanceState owns the given RowBatch.


http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc
File be/src/exec/buffered-plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc@38
PS7, Line 38:   output_batch->AcquireState(batch);
> AcquireState isn't safe to use here - it doesn't copy into fresh memory, ju
Thanks for the info Tim. Replaced this DeepCopyTo. I think BufferedTupleStream 
is already doing something like DeepCopyTo, correct?

Currently, the copy is necessary because FragmentInstanceState owns the given 
RowBatch. However, once we migrate to the BTS queue, the call to DeepCopyTo 
shouldn't be necessary, because BTS already does a copy, right?


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc
File be/src/exec/data-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc@114
PS3, Line 114: new NonBlockingRowBatchQueue(10))
> Who owns this object ? It doesn't look like it's being added to pool. We us
Fixed so that BufferedPlanRootSink owns and manages the lifecycle of the queue; 
the queue is managed be a unique_ptr.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h
File be/src/exec/plan-root-sink.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h@61
PS3, Line 61:  = 0;
> Is it still valid in this patch ? Given PlanRootSink::Send() is not pure vi
Per Tim's suggestion. I moved the PlanRootSink::Send implementation to a new 
helper method call ValidateRowBatch.


http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc
File be/src/exec/plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc@64
PS7, Line 64: Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
> I'd find it clearer if this was a helper function instead of the default im
Makes sense, moved this to a new method called ValidateRowBatch.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc@315
PS3, Line 315: Shutdown();
> Close();
Now that I removed the generic RowBatchQueue interface from this patch, 
BlockingRowBatchQueue has a Shutdown() method.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h
File be/src/runtime/blocking-row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@54
PS3, Line 54:  public BlockingQueue, RowBatchBytesFn>,
:   public RowBatchQueue {
> Not a big fan of multi-inheritance.
Now that I remo

[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp

2019-07-24 Thread Tim Armstrong (Code Review)
Tim Armstrong has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13883 )

Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and 
BufferedPRS imp
..


Patch Set 7:

(2 comments)

Didn't do a full review, but wanted to point out a memory-safety issue because 
of the arcane memory transfer rules around RowBatches.

http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc
File be/src/exec/buffered-plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc@38
PS7, Line 38:   output_batch->AcquireState(batch);
AcquireState isn't safe to use here - it doesn't copy into fresh memory, just 
does a shallow move of the batches state. You don't know what the batch might 
have pointers to. There are two potential problems
* If the FLUSH_RESOURCES flag is set, it's necessary for correctness to copy 
this batch and all preceding batches, since the memory they reference may be 
going away
* The batches may be keeping alive a lot more memory than they're actually 
references, e.g. if a lot of rows got filtered out by predicates

NljBuilder correctly handled both of these problems, but I don't think it's 
worth the complexity of trying to do that (you'd, at minimum, need to iterate 
through the queue and deep copy all the batches).

This is all a consequence of Impala very aggressively trying to avoid copying 
memory.

I'd recommend using DeepCopyTo() like here: 
https://github.com/apache/impala/blob/master/be/src/exec/nested-loop-join-builder.cc#L97

There are probably relatively few common query patterns that would exercise 
this since most common plans have an exchange feeding into PlanRootSink. A 
grouping aggregation feeding directly into PlanRootSink might be able to 
reproduce it (you'd have to set num_nodes=1 to get a non-distributed plan).


http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc
File be/src/exec/plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc@64
PS7, Line 64: Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
I'd find it clearer if this was a helper function instead of the default 
implementation of Send() - that would make it clear that it wasn't a (possibly) 
complete implementation of Send(), which is what I'd generally expect for a 
default implementation.



--
To view, visit http://gerrit.cloudera.org:8080/13883
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
Gerrit-Change-Number: 13883
Gerrit-PatchSet: 7
Gerrit-Owner: Sahil Takiar 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sahil Takiar 
Gerrit-Reviewer: Tim Armstrong 
Gerrit-Comment-Date: Wed, 24 Jul 2019 17:30:23 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp

2019-07-23 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13883 )

Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and 
BufferedPRS imp
..


Patch Set 6:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/3970/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/13883
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
Gerrit-Change-Number: 13883
Gerrit-PatchSet: 6
Gerrit-Owner: Sahil Takiar 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sahil Takiar 
Gerrit-Reviewer: Tim Armstrong 
Gerrit-Comment-Date: Wed, 24 Jul 2019 03:53:32 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp

2019-07-23 Thread Sahil Takiar (Code Review)
Hello Michael Ho, Tim Armstrong, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/13883

to look at the new patch set (#7).

Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and 
BufferedPRS imp
..

IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp

Introduces a generic RowBatchQueue interface with a blocking and
non-blocking implementation. The blocking implementation is a
re-factored version of the current RowBatchQueue. The non-blocking
implementation is simple wrapper around std::queue. The current
RowBatchQueue, which is used by the scanners, is renamed to
BlockingRowBatchQueue and it is a subclass of the new RowBatchQueue
interface. This patch stops short of completely abstracting all the
details of the current RowBatchQueue and instead includes a few TODOs.
NonBlockingRowBatchQueue has max capacity, after which calls to AddBatch
will return false.

Implements BufferedPlanRootSink using the new RowBatchQueue interface.
Currently, the NonBlockingRowBatchQueue is injected into the
BufferedPlanRootSink, however, the implementation of
BufferedPlanRootSink is not tied to NonBlockingRowBatchQueue, although
it does assume the RowBatchQueue is not thread safe. This allows a
future patch to add a RowBatchQueue backed by a BufferedTupleStream
without re-factoring BufferedPlanRootSink.

BufferedPlanRootSink FlushFinal blocks until the consumer thread has
processed all RowBatches. This ensures that the coordinator fragment
stays alive until all results are fetched, but allows all other
fragments to be shutdown immediately.

Testing:
* Running core tests
* Updated tests/query_test/test_result_spooling.py

Follow up work:
* Add a stress test in test_result_spooling.py to validate the
synchronization logic in BufferedPlanRootSink
* Handle Send calls where num_results < batch->num_rows()
* Add a direct write path in Send that directly writes a RowBatch to a
QueryResultSet, if one is available and if the RowBatchQueue is empty
* Implement a RowBatchQueue backed by a BufferedTupleStream
* Re-factor the resource management logic to release all
non-coordinator fragment resources

Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
---
M be/src/exec/blocking-plan-root-sink.cc
M be/src/exec/blocking-plan-root-sink.h
M be/src/exec/buffered-plan-root-sink.cc
M be/src/exec/buffered-plan-root-sink.h
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-scan-node.cc
M be/src/exec/kudu-scan-node.cc
M be/src/exec/plan-root-sink.cc
M be/src/exec/plan-root-sink.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/exec/scanner-context.cc
M be/src/runtime/CMakeLists.txt
R be/src/runtime/blocking-row-batch-queue.cc
A be/src/runtime/blocking-row-batch-queue.h
A be/src/runtime/non-blocking-row-batch-queue.cc
A be/src/runtime/non-blocking-row-batch-queue.h
M be/src/runtime/row-batch-queue.h
M be/src/util/blocking-queue.h
M tests/query_test/test_result_spooling.py
20 files changed, 464 insertions(+), 97 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/83/13883/7
--
To view, visit http://gerrit.cloudera.org:8080/13883
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
Gerrit-Change-Number: 13883
Gerrit-PatchSet: 7
Gerrit-Owner: Sahil Takiar 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sahil Takiar 
Gerrit-Reviewer: Tim Armstrong