[
https://issues.apache.org/jira/browse/DRILL-6418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481764#comment-16481764
]
ASF GitHub Bot commented on DRILL-6418:
---------------------------------------
asfgit closed pull request #1271: DRILL-6418: Handle Schema change in Unnest
And Lateral for unnest fie…
URL: https://github.com/apache/drill/pull/1271
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 8ea381bc1c..a09913fd8d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -125,8 +125,8 @@ public IterOutcome innerNext() {
// Left side has some records in the batch so let's process right batch
childOutcome = processRightBatch();
- // reset the left & right outcomes to OK here and send the empty batch
downstream
- // Assumption being right side will always send OK_NEW_SCHEMA with empty
batch which is what UNNEST will do
+ // reset the left & right outcomes to OK here and send the empty batch
downstream. Non-Empty right batch with
+ // OK_NEW_SCHEMA will be handled in subsequent next call
if (childOutcome == OK_NEW_SCHEMA) {
leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
rightUpstream = OK;
@@ -344,22 +344,7 @@ private IterOutcome processLeftBatch() {
switch (leftUpstream) {
case OK_NEW_SCHEMA:
// This OK_NEW_SCHEMA is received post build schema phase and from
left side
- // If schema didn't actually changed then just handle it as OK
outcome. This is fine since it is not setting
- // up any incoming vector references in setupNewSchema. While
copying the records it always work on latest
- // incoming vector.
- if (!isSchemaChanged(left.getSchema(), leftSchema)) {
- logger.warn(String.format("New schema received from left side is
same as previous known left schema. " +
- "Ignoring this schema change. Old Left Schema: %s, New Left
Schema: %s", leftSchema, left.getSchema()));
-
- // Current left batch is empty and schema didn't changed as well,
so let's get next batch and loose
- // OK_NEW_SCHEMA outcome
- processLeftBatchInFuture = false;
- if (emptyLeftBatch) {
- continue;
- } else {
- leftUpstream = OK;
- }
- } else if (outputIndex > 0) { // can only reach here from
produceOutputBatch
+ if (outputIndex > 0) { // can only reach here from produceOutputBatch
// This means there is already some records from previous join
inside left batch
// So we need to pass that downstream and then handle the
OK_NEW_SCHEMA in subsequent next call
processLeftBatchInFuture = true;
@@ -439,20 +424,12 @@ private IterOutcome processRightBatch() {
// We should not get OK_NEW_SCHEMA multiple times for the same left
incoming batch. So there won't be a
// case where we get OK_NEW_SCHEMA --> OK (with batch) --->
OK_NEW_SCHEMA --> OK/EMIT fall through
//
- // Right batch with OK_NEW_SCHEMA is always going to be an empty
batch, so let's pass the new schema
- // downstream and later with subsequent next() call the join output
will be produced
- Preconditions.checkState(right.getRecordCount() == 0,
- "Right side batch with OK_NEW_SCHEMA is not empty");
-
- if (!isSchemaChanged(right.getSchema(), rightSchema)) {
- logger.warn(String.format("New schema received from right side is
same as previous known right schema. " +
- "Ignoring this schema change. Old Right schema: %s, New Right
Schema: %s",
- rightSchema, right.getSchema()));
- continue;
- }
+ // Right batch with OK_NEW_SCHEMA can be non-empty so update the
rightJoinIndex correctly and pass the
+ // new schema downstream with empty batch and later with subsequent
next() call the join output will be
+ // produced
if (handleSchemaChange()) {
container.setRecordCount(0);
- rightJoinIndex = -1;
+ rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
return OK_NEW_SCHEMA;
} else {
return STOP;
@@ -637,10 +614,10 @@ private void finalizeOutputContainer() {
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
batchMemoryManager.updateOutgoingStats(outputIndex);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
- logger.debug("Number of records emitted: " + outputIndex);
- }
+
+ logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+ logger.debug("Number of records emitted: {} and Allocator Stats:
[AllocatedMem: {}, PeakMem: {}]", outputIndex,
+ container.getAllocator().getAllocatedMemory(),
container.getAllocator().getPeakMemoryAllocation());
// Update the output index for next output batch to zero
outputIndex = 0;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index eab90071ab..8a88db9fef 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -526,7 +526,7 @@ private void setupNewSchemaFromInput(RecordBatch
incomingBatch) throws SchemaCha
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
setupNewSchemaFromInput(this.incoming);
- if (container.isSchemaChanged()) {
+ if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
container.buildSchema(SelectionVectorMode.NONE);
return true;
} else {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index fe91fc36a8..ed5d91c22c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -389,11 +389,15 @@ private boolean schemaChanged() {
final MaterializedField thisField =
incoming.getSchema().getColumn(fieldId.getFieldIds()[0]);
final MaterializedField prevField = unnestFieldMetadata;
Preconditions.checkNotNull(thisField);
- unnestFieldMetadata = thisField;
+
// isEquivalent may return false if the order of the fields has changed.
This usually does not
// happen but if it does we end up throwing a spurious schema change
exeption
if (prevField == null || !prevField.isEquivalent(thisField)) {
logger.debug("Schema changed");
+ // We should store the clone of MaterializedField for unnest column
instead of reference. When the column is of
+ // type Map and there is change in any children field of the Map then
that will update the reference variable and
+ // isEquivalent check will still return true.
+ unnestFieldMetadata = thisField.clone();
return true;
}
return false;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 67598e0132..f16123443f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -100,8 +100,29 @@ public int hashCode() {
return result;
}
- // DRILL-5525: the semantics of this method are badly broken.
- // Caveat emptor.
+ /**
+ * DRILL-5525: the semantics of this method are badly broken.
+ * Caveat emptor.
+ *
+ * This check used for detecting actual schema change inside operator record
batch will not work for
+ * AbstractContainerVectors (like MapVector). In each record batch a
reference to incoming batch schema is
+ * stored (let say S:{a: int}) and then equals is called on that stored
reference and current incoming batch schema.
+ * Internally schema object has references to Materialized fields from
vectors in container. If there is change in
+ * incoming batch schema, then the upstream will create a new ValueVector in
its output container with the new
+ * detected type, which in turn will have new instance for Materialized
Field. Then later a new BatchSchema object
+ * is created for this new incoming batch (let say S":{a":varchar}). The
operator calling equals will have reference
+ * to old schema object (S) and hence first check will not be satisfied and
then it will call equals on each of the
+ * Materialized Field (a.equals(a")). Since new materialized field is
created for newly created vector the equals
+ * check on field will return false. And schema change will be detected in
this case.
+ * Now consider instead of int vector there is a MapVector such that initial
schema was (let say S:{a:{b:int, c:int}}
+ * and then later schema for Map field c changes, then in container Map
vector will be found but later the children
+ * vector for field c will be replaced. This new schema object will be
created as (S":{a:{b:int, c":varchar}}). Now
+ * when S.equals(S") is called it will eventually call a.equals(a) which
will return true even though the schema of
+ * children value vector c has changed. This is because no new vector is
created for field (a) and hence it's object
+ * reference to MaterializedField has not changed which will be reflected in
both old and new schema instances.
+ * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)}
method instead since
+ * {@link MaterializedField#isEquivalent(MaterializedField)} method is
updated to remove the reference check.
+ */
@Override
public boolean equals(Object obj) {
@@ -151,7 +172,7 @@ public boolean equals(Object obj) {
/**
* Compare that two schemas are identical according to the rules defined
- * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In
particular,
+ * in {@link MaterializedField#isEquivalent(MaterializedField)}. In
particular,
* this method requires that the fields have a 1:1 ordered correspondence
* in the two schemas.
*
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index e35bb5f30c..0ea23f709d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -135,6 +135,15 @@ public void transferOut(VectorContainer containerOut) {
return addOrGet(field, null);
}
+ /**
+ * This method should be called with MaterializedField which also has
correct children field list specially when
+ * the field type is MAP. Otherwise after calling this method if caller is
not creating TransferPair on the
+ * ValueVector, then the new ValueVector will not have information about
it's list of children MaterializedField.
+ * @param field
+ * @param callBack
+ * @param <T>
+ * @return
+ */
@SuppressWarnings("unchecked")
public <T extends ValueVector> T addOrGet(final MaterializedField field,
final SchemaChangeCallBack callBack) {
final TypedFieldId id =
getValueVectorId(SchemaPath.getSimplePath(field.getName()));
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 51df6e4cd5..e9e9aacc6e 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -45,6 +45,7 @@
import java.util.List;
import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(OperatorTest.class)
@@ -853,8 +854,8 @@ public void
testHandlingUnexpectedSchemaChangeForUnnestField() throws Exception
}
/**
- * When multiple left batch is received with same schema but with
OK_NEW_SCHEMA, then LATERAL detects that
- * correctly and suppresses schema change operation by producing output in
same batch created with initial schema.
+ * When multiple left batch is received with same schema but with
OK_NEW_SCHEMA, then LATERAL rebuilds the
+ * schema each time and sends output in multiple output batches
* The schema change was only for columns which are not produced by the
UNNEST or right branch.
*
* @throws Exception
@@ -904,6 +905,8 @@ public void
testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForNonUnnestField() throw
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
assertTrue(totalRecordCount ==
(nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -922,9 +925,8 @@ public void
testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForNonUnnestField() throw
}
/**
- * When multiple left batch is received with same schema but with
OK_NEW_SCHEMA, then LATERAL detects that
- * correctly and suppresses false schema change indication from both left
and right branch. It produces output in
- * same batch created with initial schema.
+ * When multiple left batch is received with same schema but with
OK_NEW_SCHEMA, then LATERAL correctly
+ * handles it by re-creating the schema and producing multiple batches of
final output
* The schema change is for columns common on both left and right side.
*
* @throws Exception
@@ -976,6 +978,9 @@ public void
testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForUnnestField() throws E
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
assertTrue(totalRecordCount ==
(nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -2560,4 +2565,241 @@ public void
test_OK_NEW_SCHEMAFromLeft_EmitFromRight_PostBuildSchema() throws Ex
rightMockBatch.close();
}
}
+
+ /**
+ * Verifies that if a non-empty batch with OK_NEW_SCHEMA is received from
right side post buildSchema phase then it
+ * is handled correctly by sending an empty batch with OK_NEW_SCHEMA and
later consuming it to produce actual
+ * output batch with some data
+ */
+ @Test
+ public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws
Exception {
+ // Create left input schema 2
+ TupleMetadata leftSchema2 = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.VARCHAR)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ // Create right input schema
+ TupleMetadata rightSchema2 = new SchemaBuilder()
+ .add("id_right", TypeProtos.MinorType.INT)
+ .add("cost_right", TypeProtos.MinorType.VARCHAR)
+ .add("name_right", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+
+ // Create data for left input
+ final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+ .addRow(2, "20", "item20")
+ .build();
+
+ // Create data for right input
+ final RowSet.SingleRowSet emptyRightRowSet2 =
fixture.rowSetBuilder(rightSchema2)
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyRightRowSet2 =
fixture.rowSetBuilder(rightSchema2)
+ .addRow(4, "41", "item41")
+ .addRow(5, "51", "item51")
+ .build();
+
+ // Get the left container with dummy data for Lateral Join
+ leftContainer.add(nonEmptyLeftRowSet.container());
+ leftContainer.add(leftRowSet2.container());
+
+ // Get the left IterOutcomes for Lateral Join
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ // Create Left MockRecordBatch
+ final CloseableRecordBatch leftMockBatch = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+ // Get the right container with dummy data
+ // first OK_NEW_SCHEMA batch
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet2.container()); // non-empty
OK_NEW_SCHEMA batch
+ rightContainer.add(emptyRightRowSet2.container());
+
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch rightMockBatch = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+ final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig,
fixture.getFragmentContext(),
+ leftMockBatch, rightMockBatch);
+
+ try {
+ int totalRecordCount = 0;
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
+ // This means 2 output record batches were received because of Schema
change
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+ assertEquals(0, ljBatch.getRecordCount());
+ totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+ totalRecordCount += ljBatch.getRecordCount();
+ assertTrue(totalRecordCount ==
+ (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
+ leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
+
+ assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+ } catch (AssertionError | Exception error) {
+ fail();
+ } finally {
+ // Close all the resources for this test case
+ ljBatch.close();
+ leftMockBatch.close();
+ rightMockBatch.close();
+ leftRowSet2.clear();
+ emptyRightRowSet2.clear();
+ nonEmptyRightRowSet2.clear();
+ }
+ }
+
+ /**
+ * Test to verify in case of Multilevel lateral when a non-empty
OK_NEW_SCHEMA batch post build schema phase is
+ * received from right most UNNEST of lower LATERAL then pipeline works fine.
+ * @throws Exception
+ */
+ @Test
+ public void
testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws
Exception {
+ // ** Prepare first pair of left batch and right batch for lower level
LATERAL Lateral_1 **
+
+ // Create left input schema for first batch
+ TupleMetadata leftSchema2 = new SchemaBuilder()
+ .add("id_left_new", TypeProtos.MinorType.INT)
+ .add("cost_left_new", TypeProtos.MinorType.INT)
+ .add("name_left_new", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 =
fixture.rowSetBuilder(leftSchema2).build();
+ final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 =
fixture.rowSetBuilder(leftSchema2)
+ .addRow(6, 60, "item6")
+ .build();
+
+ leftContainer.add(emptyLeftRowSet.container());
+ leftContainer.add(nonEmptyLeftRowSet.container());
+ leftContainer.add(emptyLeftRowSet_leftSchema2.container());
+ leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());
+
+ // Get the left IterOutcomes for Lateral Join
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch leftMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+ // Get the right container with dummy data
+ TupleMetadata rightSchema2 = new SchemaBuilder()
+ .add("id_right_new", TypeProtos.MinorType.INT)
+ .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
+ .add("name_right_new", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 =
fixture.rowSetBuilder(rightSchema2).build();
+ final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 =
fixture.rowSetBuilder(rightSchema2)
+ .addRow(5, "51", "item51")
+ .addRow(6, "61", "item61")
+ .addRow(7, "71", "item71")
+ .build();
+
+ rightContainer.add(emptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet.container());
+ rightContainer.add(nonEmptyRightRowSet_rightSchema2.container()); //
non-empty batch with Ok_new_schema
+ rightContainer.add(emptyRightRowSet_rightSchema2.container());
+
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+
+ final LateralJoinBatch lowerLevelLateral = new
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+ leftMockBatch_1, rightMockBatch_1);
+
+ // ** Prepare second pair of left and right batch for upper level
Lateral_2 **
+
+ // Create left input schema for first batch
+ TupleMetadata leftSchema3 = new SchemaBuilder()
+ .add("id_left_left", TypeProtos.MinorType.INT)
+ .add("cost_left_left", TypeProtos.MinorType.INT)
+ .add("name_left_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 =
fixture.rowSetBuilder(leftSchema3).build();
+ final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 =
fixture.rowSetBuilder(leftSchema3)
+ .addRow(6, 60, "item6")
+ .build();
+
+ // Get left input schema for second left batch
+ TupleMetadata leftSchema4 = new SchemaBuilder()
+ .add("id_left_left_new", TypeProtos.MinorType.INT)
+ .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR)
+ .add("name_left_left_new", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 =
fixture.rowSetBuilder(leftSchema4)
+ .addRow(100, "100", "item100")
+ .build();
+
+ // Build Left container for upper level LATERAL operator
+ final List<VectorContainer> leftContainer2 = new ArrayList<>(5);
+
+ // Get the left container with dummy data
+ leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
+ leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
+ leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());
+
+ // Get the left container outcomes for upper level LATERAL operator
+ final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
+ leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ leftOutcomes2.add(RecordBatch.IterOutcome.OK);
+ leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ final CloseableRecordBatch leftMockBatch_2 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+ leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());
+
+ final LateralJoinBatch upperLevelLateral = new
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+ leftMockBatch_2, lowerLevelLateral);
+
+ try {
+ // 3 for first batch on left side and another 3 for next left batch
+ final int expectedOutputRecordCount = 6;
+ int actualOutputRecordCount = 0;
+
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA ==
upperLevelLateral.next());
+ assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA ==
upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA ==
upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+ actualOutputRecordCount += upperLevelLateral.getRecordCount();
+ assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
+ assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
+ } catch (AssertionError | Exception error) {
+ fail();
+ } finally {
+ // Close all the resources for this test case
+ upperLevelLateral.close();
+ leftMockBatch_2.close();
+ lowerLevelLateral.close();
+ leftMockBatch_1.close();
+ rightMockBatch_1.close();
+ leftContainer2.clear();
+ leftOutcomes2.clear();
+ }
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
index 4757488559..38c0f51866 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -17,16 +17,19 @@
*/
package org.apache.drill.exec.physical.impl.limit;
+import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.test.rowSet.RowSet;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@Category(OperatorTest.class)
public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
/**
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
index b3099d0b24..cc737e9793 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -17,17 +17,20 @@
*/
package org.apache.drill.exec.physical.impl.project;
+import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.test.rowSet.RowSet;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@Category(OperatorTest.class)
public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
/**
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index ec043b26e8..70a32f8f4d 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -223,8 +223,12 @@ public void testUnnestMultipleNewSchemaIncoming() {
Object[][][] baseline = {
{
- {1, 1, 2, 2, 2, 3, 3, 4},
- {"0", "1", "2", "3", "4", "5", "6", "9"}
+ {1, 1, 2, 2, 2, 3, 3},
+ {"0", "1", "2", "3", "4", "5", "6"}
+ },
+ {
+ {4},
+ {"9"}
}
};
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index fa4d2767e7..672bb7e0c4 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -111,7 +111,7 @@ public void removeChild(MaterializedField field) {
* <p>
* By allowing the non-critical metadata to change, we preserve the
* child relationships as a list or union evolves.
- * @param type
+ * @param newType
*/
public void replaceType(MajorType newType) {
@@ -190,11 +190,20 @@ public int hashCode() {
return Objects.hash(this.name, this.type, this.children);
}
+ /**
+ * Equals method doesn't check for the children list of fields here. When a
batch is sent over network then it is
+ * serialized along with the Materialized Field which also contains
information about the internal vectors like
+ * offset and bits. While deserializing, these vectors are treated as
children of parent vector. If a operator on
+ * receiver side like Sort receives a schema in buildSchema phase and then
later on receives another batch, that
+ * will result in schema change and query will fail. This is because second
batch schema will contain information
+ * about internal vectors like offset and bits which will not be present in
first batch schema. For ref: See
+ * TestSort#testSortWithRepeatedMapWithExchanges
+ *
+ * @param obj
+ * @return
+ */
@Override
public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
if (obj == null) {
return false;
}
@@ -206,7 +215,7 @@ public boolean equals(Object obj) {
// in MapVector$MapTransferPair
return this.name.equalsIgnoreCase(other.name) &&
- Objects.equals(this.type, other.type);
+ Objects.equals(this.type, other.type);
}
/**
@@ -230,6 +239,27 @@ public boolean equals(Object obj) {
* sense.) Operators that want to reconcile two maps that differ only in
* column order need a different comparison.</li>
* </ul>
+ * <ul>
+ * Note: Materialized Field and ValueVector has 1:1 mapping which means for
each ValueVector there is a materialized
+ * field associated with it. So when we replace or add a ValueVector in a
VectorContainer then we create new
+ * Materialized Field object for the new vector. This works fine for
Primitive type ValueVectors but for ValueVector
+ * which are of type {@link
org.apache.drill.exec.vector.complex.AbstractContainerVector} there is some
differences on
+ * how Materialized field and ValueVector objects are updated inside the
container which both ValueVector and
+ * Materialized Field object both mutable.
+ * <p>
+ * For example: For cases of MapVector it can so happen that only the
children field type changed but
+ * the parent Map type and name remained same. In these cases we replace the
children field ValueVector from parent
+ * MapVector inside main batch container, with new type of vector. Thus the
reference of parent MaprVector inside
+ * batch container remains same but the reference of children field
ValueVector stored inside MapVector get's updated.
+ * During this update it also replaces the Materialized field for that
children field which is stored in childrens
+ * list of the parent MapVector Materialized Field.
+ * Since the children list of parent Materialized Field is updated, this
make this class mutable. Hence there should
+ * not be any check for object reference equality here but instead there
should be deep comparison which is what
+ * this method is now performing. Since if we have object reference check
then in above cases it will return true for
+ * 2 Materialized Field object whose children field list is different which
is not correct. Same holds true for
+ * {@link MaterializedField#isEquivalent(MaterializedField)} method.
+ * </p>
+ * </ul>
*
* @param other another field
* @return <tt>true</tt> if the columns are identical according to the
@@ -237,9 +267,6 @@ public boolean equals(Object obj) {
*/
public boolean isEquivalent(MaterializedField other) {
- if (this == other) {
- return true;
- }
if (! name.equalsIgnoreCase(other.name)) {
return false;
}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 36823970d0..1d0e03be34 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -129,6 +129,9 @@ public boolean allocateNewSafe() {
return (T) existing;
} else if (nullFilled(existing)) {
existing.clear();
+ // Since it's removing old vector and adding new one based on new type,
it should do same for Materialized field,
+ // Otherwise there will be duplicate of same field with same name but
different type.
+ field.removeChild(existing.getField());
create = true;
}
if (create) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Handle Schema change in Unnest And Lateral for unnest field / non-unnest field
> ------------------------------------------------------------------------------
>
> Key: DRILL-6418
> URL: https://issues.apache.org/jira/browse/DRILL-6418
> Project: Apache Drill
> Issue Type: Improvement
> Reporter: Sorabh Hamirwasia
> Assignee: Sorabh Hamirwasia
> Priority: Major
> Labels: ready-to-commit
> Fix For: 1.14.0
>
>
> Handling the schema change scenarios for LATERAL and UNNEST when schema
> change is observed for non-unnest field and unnest field. It should also
> handle the scenario when UNNEST field is Repeated Map Type and schema change
> happens only in the children field of Map component. Following issues were
> found:
> 1) There were issues found in how scan treats that kind of data where it
> scans 2 files, one has Map (let say cutomer_order) children type (custKey) as
> integer and other has custKey as string type. Then Scan just replaces the
> ValueVector from its output container for custKey but in the schema it has 2
> fields with same name but different types.
> 2) Unnest and Lateral check for schema change across batches based on the
> MaterializedField and BatchSchema reference they store. But since it's a
> reference any change in pointed value changes their reference as well and
> hence comparison always returns true. So for Unnest it actually keeps a clone
> of the Materialized Field instead of reference and use that to determine for
> schema change. For LATERAL any OK_NEW_SCHEMA from upstream is treated as
> actual schema change and setup is done again.
> 3) Since the MaterializedField is mutable the isEquivalent method checks for
> object equality which is removed as that is not correct for mutable objects.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)