Github user ppadma commented on a diff in the pull request:
https://github.com/apache/drill/pull/1107#discussion_r166427892
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
---
@@ -102,20 +105,78 @@
private final List<Comparator> comparators;
private final JoinRelType joinType;
private JoinWorker worker;
+ private final long outputBatchSize;
private static final String LEFT_INPUT = "LEFT INPUT";
private static final String RIGHT_INPUT = "RIGHT INPUT";
+ private class MergeJoinMemoryManager extends
AbstractRecordBatchMemoryManager {
+ private int leftRowWidth;
+ private int rightRowWidth;
+
+ /**
+ * mergejoin operates on one record at a time from the left and right
batches
+ * using RecordIterator abstraction. We have a callback mechanism to
get notified
+ * when new batch is loaded in record iterator.
+ * This can get called in the middle of current output batch we are
building.
+ * when this gets called, adjust number of output rows for the current
batch and
+ * update the value to be used for subsequent batches.
+ */
+ @Override
+ public void update(int inputIndex) {
+ switch(inputIndex) {
+ case 0:
+ final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
+ leftRowWidth = leftSizer.netRowWidth();
+ break;
+ case 1:
+ final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
+ rightRowWidth = rightSizer.netRowWidth();
+ default:
+ break;
+ }
+
+ final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
+
+ // If outgoing row width is 0, just return. This is possible for
empty batches or
+ // when first set of batches come with OK_NEW_SCHEMA and no data.
+ if (newOutgoingRowWidth == 0) {
+ return;
+ }
+
+ // update the value to be used for next batch(es)
+ setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT,
+
Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR,
newOutgoingRowWidth), MIN_NUM_ROWS)));
+
+ // Adjust for the current batch.
+ // calculate memory used so far based on previous outgoing row width
and how many rows we already processed.
+ final long memoryUsed = status.getOutPosition() *
getOutgoingRowWidth();
+ // This is the remaining memory.
+ final long remainingMemory =
Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0);
+ // These are number of rows we can fit in remaining memory based on
new outgoing row width.
+ final int numOutputRowsRemaining =
RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
+
+ final int adjustedOutputRowCount = Math.min(MAX_NUM_ROWS,
Math.max(status.getOutPosition() + numOutputRowsRemaining, MIN_NUM_ROWS));
+ status.setOutputRowCount(adjustedOutputRowCount);
+ setOutgoingRowWidth(newOutgoingRowWidth);
--- End diff --
Yes, this is how it works.
We read from left and right side using RecordIterator abstraction, which
reads full record batches underneath and gives one record at a time with its
next call. I have a callback mechanism when we read a new batch in record
iterator to adjust the row widths. When I get the callback, I adjust row count
for the current batch we are working on based on remaining memory available for
the current batch and also compute and save the row count I should use for next
full batch. In the innerNext, when we start working on the next output batch, I
am setting the target output row count based on this value.
Addressed all other code review comments. Please take a look when you get a
chance.
---