parthchandra commented on a change in pull request #1401: DRILL-6616: Batch 
Processing for Lateral/Unnest
URL: https://github.com/apache/drill/pull/1401#discussion_r205897581
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 ##########
 @@ -83,55 +109,80 @@ public void setOutputCount(int outputCount) {
     outputLimit = outputCount;
   }
 
+  @Override
+  public void setRowIdVector(IntVector v) {
+    this.rowIdVector = v;
+    this.rowIdVectorMutator = rowIdVector.getMutator();
+  }
+
   @Override
   public final int unnestRecords(final int recordCount) {
     Preconditions.checkArgument(svMode == NONE, "Unnest does not support 
selection vector inputs.");
-    if (innerValueIndex == -1) {
-      innerValueIndex = 0;
-    }
-
-    // Current record being processed in the incoming record batch. We could 
keep
-    // track of it ourselves, but it is better to check with the Lateral Join 
and get the
-    // current record being processed thru the Lateral Join Contract.
-    final int currentRecord = lateral.getRecordIndex();
-    final int innerValueCount = accessor.getInnerValueCountAt(currentRecord);
-    final int count = Math.min(Math.min(innerValueCount, outputLimit), 
recordCount);
 
-    logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record 
count: {}, output limit: {}", innerValueCount,
-        recordCount, outputLimit);
+    final int initialInnerValueIndex = runningInnerValueIndex;
+
+    outer:
+    {
+      int outputIndex = 0; // index in the output vector that we are writing to
+      final int valueCount = accessor.getValueCount();
+
+      for (; valueIndex < valueCount; valueIndex++) {
+        final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
+        logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record 
count: {}, output limit: {}",
+            innerValueCount, recordCount, outputLimit);
+
+        for (; innerValueIndex < innerValueCount; innerValueIndex++) {
+          // If we've hit the batch size limit, stop and flush what we've got 
so far.
+          if (outputIndex == outputLimit) {
+            // Flush this batch.
+            break outer;
+          }
+          try {
+            // rowId starts at 1, so the value for rowId is valueIndex+1
+            rowIdVectorMutator.setSafe(outputIndex, valueIndex + 1);
+
+          } finally {
+            outputIndex++;
+            //currentInnerValueIndexLocal++;
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to