This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 613769d08e DRILL-8480: make Nested Loop Join operator properly process 
empty batches and batches with new schema (#2897)
613769d08e is described below

commit 613769d08e6c4de6ef4cedbc03b6ae8fdedee5a3
Author: Maksym Rymar <[email protected]>
AuthorDate: Sun May 12 07:34:14 2024 +0300

    DRILL-8480: make Nested Loop Join operator properly process empty batches 
and batches with new schema (#2897)
---
 .../exec/physical/impl/join/NestedLoopJoin.java    |  7 ++-
 .../physical/impl/join/NestedLoopJoinBatch.java    |  9 +++-
 .../physical/impl/join/NestedLoopJoinTemplate.java | 52 ++++++++++++----------
 3 files changed, 41 insertions(+), 27 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index 725c46d1ba..60b8cb169b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join;
 
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RecordBatch;
@@ -33,7 +34,9 @@ public interface NestedLoopJoin {
   public static TemplateClassDefinition<NestedLoopJoin> TEMPLATE_DEFINITION =
       new TemplateClassDefinition<>(NestedLoopJoin.class, 
NestedLoopJoinTemplate.class);
 
-  public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
+  public void setupNestedLoopJoin(FragmentContext context,
+                                  RecordBatch left,
+                                  RecordBatch.IterOutcome leftOutcome,
                                   ExpandableHyperContainer rightContainer,
                                   LinkedList<Integer> rightCounts,
                                   NestedLoopJoinBatch outgoing);
@@ -41,7 +44,7 @@ public interface NestedLoopJoin {
   void setTargetOutputCount(int targetOutputCount);
 
   // Produce output records taking into account join type
-  public int outputRecords(JoinRelType joinType);
+  public int outputRecords(JoinRelType joinType) throws SchemaChangeException;
 
   // Project the record at offset 'leftIndex' in the left input batch into the 
output container at offset 'outIndex'
   public void emitLeft(int leftIndex, int outIndex);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 4e45f7fea7..d50cb85c78 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.exceptions.UserException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -183,7 +184,7 @@ public class NestedLoopJoinBatch extends 
AbstractBinaryRecordBatch<NestedLoopJoi
           default:
         }
       }
-      nljWorker.setupNestedLoopJoin(context, left, rightContainer, 
rightCounts, this);
+      nljWorker.setupNestedLoopJoin(context, left, leftUpstream, 
rightContainer, rightCounts, this);
       state = BatchState.NOT_FIRST;
     }
 
@@ -193,7 +194,11 @@ public class NestedLoopJoinBatch extends 
AbstractBinaryRecordBatch<NestedLoopJoi
     nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
     // invoke the runtime generated method to emit records in the output batch
-    outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
+    try {
+      outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
+    } catch (SchemaChangeException e) {
+      throw UserException.schemaChangeError(e).build(logger);
+    }
 
     // Set the record count
     container.setValueCount(outputRecords);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index daf23523da..04e39c5f0e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -18,14 +18,16 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
+
 import javax.inject.Named;
 import java.util.LinkedList;
 import java.util.List;
@@ -41,8 +43,8 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
   // Current left input batch being processed
   private RecordBatch left;
 
-  // Record count of the left batch currently being processed
-  private int leftRecordCount;
+  private BatchSchema leftSchema;
+  private RecordBatch.IterOutcome leftOutcome;
 
   // List of record counts per batch in the hyper container
   private List<Integer> rightCounts;
@@ -59,20 +61,23 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
    * Method initializes necessary state and invokes the doSetup() to set the
    * input and output value vector references.
    *
-   * @param context Fragment context
-   * @param left Current left input batch being processed
+   * @param context        Fragment context
+   * @param left           Current left input batch being processed
+   * @param leftOutcome    Last left outcome
    * @param rightContainer Hyper container
-   * @param rightCounts Counts for each right container
-   * @param outgoing Output batch
+   * @param rightCounts    Counts for each right container
+   * @param outgoing       Output batch
    */
   @Override
   public void setupNestedLoopJoin(FragmentContext context,
                                   RecordBatch left,
+                                  RecordBatch.IterOutcome leftOutcome,
                                   ExpandableHyperContainer rightContainer,
                                   LinkedList<Integer> rightCounts,
                                   NestedLoopJoinBatch outgoing) {
     this.left = left;
-    this.leftRecordCount = left.getRecordCount();
+    this.leftSchema = left.getSchema();
+    this.leftOutcome = leftOutcome;
     this.rightCounts = rightCounts;
     this.outgoing = outgoing;
     doSetup(context, rightContainer, left, outgoing);
@@ -91,10 +96,13 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
    * @return the number of records produced in the output batch
    */
   @Override
-  public int outputRecords(JoinRelType joinType) {
+  public int outputRecords(JoinRelType joinType) throws SchemaChangeException {
     int outputIndex = 0;
-    while (leftRecordCount != 0) {
-      outputIndex = populateOutgoingBatch(joinType, outputIndex);
+
+    while (leftOutcome != RecordBatch.IterOutcome.NONE && leftOutcome != 
RecordBatch.IterOutcome.NOT_YET) {
+      if (left.getRecordCount() != 0) {
+        outputIndex = populateOutgoingBatch(joinType, outputIndex);
+      }
       if (outputIndex >= targetOutputRecords) {
         break;
       }
@@ -110,7 +118,7 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
    * If matching record is found both left and right records are written into 
output batch,
    * otherwise if join type is LEFT, than only left record is written, right 
batch record values will be null.
    *
-   * @param joinType join type (INNER or LEFT)
+   * @param joinType    join type (INNER or LEFT)
    * @param outputIndex index to start emitting records at
    * @return final outputIndex after producing records in the output batch
    */
@@ -123,7 +131,7 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
 
     outer:
     // for every record in the left batch
-    for (; nextLeftRecordToProcess < leftRecordCount; 
nextLeftRecordToProcess++) {
+    for (; nextLeftRecordToProcess < left.getRecordCount(); 
nextLeftRecordToProcess++) {
       // for every batch on the right
       for (; nextRightBatchToProcess < rightCounts.size(); 
nextRightBatchToProcess++) {
         int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
@@ -176,27 +184,25 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
    * Resets some internal state which indicates the next records to process in 
the left and right batches,
    * also fetches the next left input batch.
    */
-  private void resetAndGetNextLeft(int outputIndex) {
+  private void resetAndGetNextLeft(int outputIndex) throws 
SchemaChangeException {
     for (VectorWrapper<?> vw : left) {
       vw.getValueVector().clear();
     }
     tracker.reset();
-    RecordBatch.IterOutcome leftOutcome = 
outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
+
+    leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
     switch (leftOutcome) {
       case OK_NEW_SCHEMA:
-        throw new DrillRuntimeException("Nested loop join does not handle 
schema change. Schema change" +
-            " found on the left side of NLJ.");
-      case NONE:
-      case NOT_YET:
-        leftRecordCount = 0;
-        break;
+        if (!left.getSchema().equals(leftSchema)) {
+          throw SchemaChangeException.schemaChanged("Nested loop join does not 
handle schema change. Schema change" +
+            " found on the left side of NLJ.", leftSchema, left.getSchema());
+        }
       case OK:
-        outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
+        outgoing.getBatchMemoryManager().update(left, LEFT_INDEX, outputIndex);
         
setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount());
 // calculated by update()
         RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
           outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
           outgoing.getRecordBatchStatsContext());
-        leftRecordCount = left.getRecordCount();
         break;
       default:
     }

Reply via email to