This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 613769d08e DRILL-8480: make Nested Loop Join operator properly process
empty batches and batches with new schema (#2897)
613769d08e is described below
commit 613769d08e6c4de6ef4cedbc03b6ae8fdedee5a3
Author: Maksym Rymar <[email protected]>
AuthorDate: Sun May 12 07:34:14 2024 +0300
DRILL-8480: make Nested Loop Join operator properly process empty batches
and batches with new schema (#2897)
---
.../exec/physical/impl/join/NestedLoopJoin.java | 7 ++-
.../physical/impl/join/NestedLoopJoinBatch.java | 9 +++-
.../physical/impl/join/NestedLoopJoinTemplate.java | 52 ++++++++++++----------
3 files changed, 41 insertions(+), 27 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index 725c46d1ba..60b8cb169b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
@@ -33,7 +34,9 @@ public interface NestedLoopJoin {
public static TemplateClassDefinition<NestedLoopJoin> TEMPLATE_DEFINITION =
new TemplateClassDefinition<>(NestedLoopJoin.class,
NestedLoopJoinTemplate.class);
- public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
+ public void setupNestedLoopJoin(FragmentContext context,
+ RecordBatch left,
+ RecordBatch.IterOutcome leftOutcome,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing);
@@ -41,7 +44,7 @@ public interface NestedLoopJoin {
void setTargetOutputCount(int targetOutputCount);
// Produce output records taking into account join type
- public int outputRecords(JoinRelType joinType);
+ public int outputRecords(JoinRelType joinType) throws SchemaChangeException;
// Project the record at offset 'leftIndex' in the left input batch into the
output container at offset 'outIndex'
public void emitLeft(int leftIndex, int outIndex);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 4e45f7fea7..d50cb85c78 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.exceptions.UserException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.calcite.rel.core.JoinRelType;
@@ -183,7 +184,7 @@ public class NestedLoopJoinBatch extends
AbstractBinaryRecordBatch<NestedLoopJoi
default:
}
}
- nljWorker.setupNestedLoopJoin(context, left, rightContainer,
rightCounts, this);
+ nljWorker.setupNestedLoopJoin(context, left, leftUpstream,
rightContainer, rightCounts, this);
state = BatchState.NOT_FIRST;
}
@@ -193,7 +194,11 @@ public class NestedLoopJoinBatch extends
AbstractBinaryRecordBatch<NestedLoopJoi
nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
// invoke the runtime generated method to emit records in the output batch
- outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
+ try {
+ outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
+ } catch (SchemaChangeException e) {
+ throw UserException.schemaChangeError(e).build(logger);
+ }
// Set the record count
container.setValueCount(outputRecords);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index daf23523da..04e39c5f0e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -18,14 +18,16 @@
package org.apache.drill.exec.physical.impl.join;
import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
+
import javax.inject.Named;
import java.util.LinkedList;
import java.util.List;
@@ -41,8 +43,8 @@ public abstract class NestedLoopJoinTemplate implements
NestedLoopJoin {
// Current left input batch being processed
private RecordBatch left;
- // Record count of the left batch currently being processed
- private int leftRecordCount;
+ private BatchSchema leftSchema;
+ private RecordBatch.IterOutcome leftOutcome;
// List of record counts per batch in the hyper container
private List<Integer> rightCounts;
@@ -59,20 +61,23 @@ public abstract class NestedLoopJoinTemplate implements
NestedLoopJoin {
* Method initializes necessary state and invokes the doSetup() to set the
* input and output value vector references.
*
- * @param context Fragment context
- * @param left Current left input batch being processed
+ * @param context Fragment context
+ * @param left Current left input batch being processed
+ * @param leftOutcome Last left outcome
* @param rightContainer Hyper container
- * @param rightCounts Counts for each right container
- * @param outgoing Output batch
+ * @param rightCounts Counts for each right container
+ * @param outgoing Output batch
*/
@Override
public void setupNestedLoopJoin(FragmentContext context,
RecordBatch left,
+ RecordBatch.IterOutcome leftOutcome,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing) {
this.left = left;
- this.leftRecordCount = left.getRecordCount();
+ this.leftSchema = left.getSchema();
+ this.leftOutcome = leftOutcome;
this.rightCounts = rightCounts;
this.outgoing = outgoing;
doSetup(context, rightContainer, left, outgoing);
@@ -91,10 +96,13 @@ public abstract class NestedLoopJoinTemplate implements
NestedLoopJoin {
* @return the number of records produced in the output batch
*/
@Override
- public int outputRecords(JoinRelType joinType) {
+ public int outputRecords(JoinRelType joinType) throws SchemaChangeException {
int outputIndex = 0;
- while (leftRecordCount != 0) {
- outputIndex = populateOutgoingBatch(joinType, outputIndex);
+
+ while (leftOutcome != RecordBatch.IterOutcome.NONE && leftOutcome !=
RecordBatch.IterOutcome.NOT_YET) {
+ if (left.getRecordCount() != 0) {
+ outputIndex = populateOutgoingBatch(joinType, outputIndex);
+ }
if (outputIndex >= targetOutputRecords) {
break;
}
@@ -110,7 +118,7 @@ public abstract class NestedLoopJoinTemplate implements
NestedLoopJoin {
* If matching record is found both left and right records are written into
output batch,
* otherwise if join type is LEFT, than only left record is written, right
batch record values will be null.
*
- * @param joinType join type (INNER or LEFT)
+ * @param joinType join type (INNER or LEFT)
* @param outputIndex index to start emitting records at
* @return final outputIndex after producing records in the output batch
*/
@@ -123,7 +131,7 @@ public abstract class NestedLoopJoinTemplate implements
NestedLoopJoin {
outer:
// for every record in the left batch
- for (; nextLeftRecordToProcess < leftRecordCount;
nextLeftRecordToProcess++) {
+ for (; nextLeftRecordToProcess < left.getRecordCount();
nextLeftRecordToProcess++) {
// for every batch on the right
for (; nextRightBatchToProcess < rightCounts.size();
nextRightBatchToProcess++) {
int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
@@ -176,27 +184,25 @@ public abstract class NestedLoopJoinTemplate implements
NestedLoopJoin {
* Resets some internal state which indicates the next records to process in
the left and right batches,
* also fetches the next left input batch.
*/
- private void resetAndGetNextLeft(int outputIndex) {
+ private void resetAndGetNextLeft(int outputIndex) throws
SchemaChangeException {
for (VectorWrapper<?> vw : left) {
vw.getValueVector().clear();
}
tracker.reset();
- RecordBatch.IterOutcome leftOutcome =
outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
+
+ leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
switch (leftOutcome) {
case OK_NEW_SCHEMA:
- throw new DrillRuntimeException("Nested loop join does not handle
schema change. Schema change" +
- " found on the left side of NLJ.");
- case NONE:
- case NOT_YET:
- leftRecordCount = 0;
- break;
+ if (!left.getSchema().equals(leftSchema)) {
+ throw SchemaChangeException.schemaChanged("Nested loop join does not
handle schema change. Schema change" +
+ " found on the left side of NLJ.", leftSchema, left.getSchema());
+ }
case OK:
- outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
+ outgoing.getBatchMemoryManager().update(left, LEFT_INDEX, outputIndex);
setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount());
// calculated by update()
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
outgoing.getRecordBatchStatsContext());
- leftRecordCount = left.getRecordCount();
break;
default:
}