[ 
https://issues.apache.org/jira/browse/DRILL-6418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481764#comment-16481764
 ] 

ASF GitHub Bot commented on DRILL-6418:
---------------------------------------

asfgit closed pull request #1271: DRILL-6418: Handle Schema change in Unnest 
And Lateral for unnest fie…
URL: https://github.com/apache/drill/pull/1271
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 8ea381bc1c..a09913fd8d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -125,8 +125,8 @@ public IterOutcome innerNext() {
     // Left side has some records in the batch so let's process right batch
     childOutcome = processRightBatch();
 
-    // reset the left & right outcomes to OK here and send the empty batch 
downstream
-    // Assumption being right side will always send OK_NEW_SCHEMA with empty 
batch which is what UNNEST will do
+    // reset the left & right outcomes to OK here and send the empty batch 
downstream. Non-Empty right batch with
+    // OK_NEW_SCHEMA will be handled in subsequent next call
     if (childOutcome == OK_NEW_SCHEMA) {
       leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
       rightUpstream = OK;
@@ -344,22 +344,7 @@ private IterOutcome processLeftBatch() {
       switch (leftUpstream) {
         case OK_NEW_SCHEMA:
           // This OK_NEW_SCHEMA is received post build schema phase and from 
left side
-          // If schema didn't actually changed then just handle it as OK 
outcome. This is fine since it is not setting
-          // up any incoming vector references in setupNewSchema. While 
copying the records it always work on latest
-          // incoming vector.
-          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
-            logger.warn(String.format("New schema received from left side is 
same as previous known left schema. " +
-              "Ignoring this schema change. Old Left Schema: %s, New Left 
Schema: %s", leftSchema, left.getSchema()));
-
-            // Current left batch is empty and schema didn't changed as well, 
so let's get next batch and loose
-            // OK_NEW_SCHEMA outcome
-            processLeftBatchInFuture = false;
-            if (emptyLeftBatch) {
-              continue;
-            } else {
-              leftUpstream = OK;
-            }
-          } else if (outputIndex > 0) { // can only reach here from 
produceOutputBatch
+          if (outputIndex > 0) { // can only reach here from produceOutputBatch
             // This means there is already some records from previous join 
inside left batch
             // So we need to pass that downstream and then handle the 
OK_NEW_SCHEMA in subsequent next call
             processLeftBatchInFuture = true;
@@ -439,20 +424,12 @@ private IterOutcome processRightBatch() {
           // We should not get OK_NEW_SCHEMA multiple times for the same left 
incoming batch. So there won't be a
           // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> 
OK_NEW_SCHEMA --> OK/EMIT fall through
           //
-          // Right batch with OK_NEW_SCHEMA is always going to be an empty 
batch, so let's pass the new schema
-          // downstream and later with subsequent next() call the join output 
will be produced
-          Preconditions.checkState(right.getRecordCount() == 0,
-            "Right side batch with OK_NEW_SCHEMA is not empty");
-
-          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
-            logger.warn(String.format("New schema received from right side is 
same as previous known right schema. " +
-              "Ignoring this schema change. Old Right schema: %s, New Right 
Schema: %s",
-              rightSchema, right.getSchema()));
-            continue;
-          }
+          // Right batch with OK_NEW_SCHEMA can be non-empty so update the 
rightJoinIndex correctly and pass the
+          // new schema downstream with empty batch and later with subsequent 
next() call the join output will be
+          // produced
           if (handleSchemaChange()) {
             container.setRecordCount(0);
-            rightJoinIndex = -1;
+            rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
             return OK_NEW_SCHEMA;
           } else {
             return STOP;
@@ -637,10 +614,10 @@ private void finalizeOutputContainer() {
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-      logger.debug("Number of records emitted: " + outputIndex);
-    }
+
+    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+    logger.debug("Number of records emitted: {} and Allocator Stats: 
[AllocatedMem: {}, PeakMem: {}]", outputIndex,
+      container.getAllocator().getAllocatedMemory(), 
container.getAllocator().getPeakMemoryAllocation());
 
     // Update the output index for next output batch to zero
     outputIndex = 0;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index eab90071ab..8a88db9fef 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -526,7 +526,7 @@ private void setupNewSchemaFromInput(RecordBatch 
incomingBatch) throws SchemaCha
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     setupNewSchemaFromInput(this.incoming);
-    if (container.isSchemaChanged()) {
+    if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
       container.buildSchema(SelectionVectorMode.NONE);
       return true;
     } else {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index fe91fc36a8..ed5d91c22c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -389,11 +389,15 @@ private boolean schemaChanged() {
     final MaterializedField thisField = 
incoming.getSchema().getColumn(fieldId.getFieldIds()[0]);
     final MaterializedField prevField = unnestFieldMetadata;
     Preconditions.checkNotNull(thisField);
-    unnestFieldMetadata = thisField;
+
     // isEquivalent may return false if the order of the fields has changed. 
This usually does not
     // happen but if it does we end up throwing a spurious schema change 
exeption
     if (prevField == null || !prevField.isEquivalent(thisField)) {
       logger.debug("Schema changed");
+      // We should store the clone of MaterializedField for unnest column 
instead of reference. When the column is of
+      // type Map and there is change in any children field of the Map then 
that will update the reference variable and
+      // isEquivalent check will still return true.
+      unnestFieldMetadata = thisField.clone();
       return true;
     }
     return false;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 67598e0132..f16123443f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -100,8 +100,29 @@ public int hashCode() {
     return result;
   }
 
-  // DRILL-5525: the semantics of this method are badly broken.
-  // Caveat emptor.
+  /**
+   * DRILL-5525: the semantics of this method are badly broken.
+   * Caveat emptor.
+   *
+   * This check used for detecting actual schema change inside operator record 
batch will not work for
+   * AbstractContainerVectors (like MapVector). In each record batch a 
reference to incoming batch schema is
+   * stored (let say S:{a: int}) and then equals is called on that stored 
reference and current incoming batch schema.
+   * Internally schema object has references to Materialized fields from 
vectors in container. If there is change in
+   * incoming batch schema, then the upstream will create a new ValueVector in 
its output container with the new
+   * detected type, which in turn will have new instance for Materialized 
Field. Then later a new BatchSchema object
+   * is created for this new incoming batch (let say S":{a":varchar}). The 
operator calling equals will have reference
+   * to old schema object (S) and hence first check will not be satisfied and 
then it will call equals on each of the
+   * Materialized Field (a.equals(a")). Since new materialized field is 
created for newly created vector the equals
+   * check on field will return false. And schema change will be detected in 
this case.
+   * Now consider instead of int vector there is a MapVector such that initial 
schema was (let say S:{a:{b:int, c:int}}
+   * and then later schema for Map field c changes, then in container Map 
vector will be found but later the children
+   * vector for field c will be replaced. This new schema object will be 
created as (S":{a:{b:int, c":varchar}}). Now
+   * when S.equals(S") is called it will eventually call a.equals(a) which 
will return true even though the schema of
+   * children value vector c has changed. This is because no new vector is 
created for field (a) and hence it's object
+   * reference to MaterializedField has not changed which will be reflected in 
both old and new schema instances.
+   * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} 
method instead since
+   * {@link MaterializedField#isEquivalent(MaterializedField)} method is 
updated to remove the reference check.
+   */
 
   @Override
   public boolean equals(Object obj) {
@@ -151,7 +172,7 @@ public boolean equals(Object obj) {
 
   /**
    * Compare that two schemas are identical according to the rules defined
-   * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In 
particular,
+   * in {@link MaterializedField#isEquivalent(MaterializedField)}. In 
particular,
    * this method requires that the fields have a 1:1 ordered correspondence
    * in the two schemas.
    *
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index e35bb5f30c..0ea23f709d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -135,6 +135,15 @@ public void transferOut(VectorContainer containerOut) {
     return addOrGet(field, null);
   }
 
+  /**
+   * This method should be called with MaterializedField which also has 
correct children field list specially when
+   * the field type is MAP. Otherwise after calling this method if caller is 
not creating TransferPair on the
+   * ValueVector, then the new ValueVector will not have information about 
it's list of children MaterializedField.
+   * @param field
+   * @param callBack
+   * @param <T>
+   * @return
+   */
   @SuppressWarnings("unchecked")
   public <T extends ValueVector> T addOrGet(final MaterializedField field, 
final SchemaChangeCallBack callBack) {
     final TypedFieldId id = 
getValueVectorId(SchemaPath.getSimplePath(field.getName()));
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 51df6e4cd5..e9e9aacc6e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -45,6 +45,7 @@
 import java.util.List;
 
 import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @Category(OperatorTest.class)
@@ -853,8 +854,8 @@ public void 
testHandlingUnexpectedSchemaChangeForUnnestField() throws Exception
   }
 
   /**
-   * When multiple left batch is received with same schema but with 
OK_NEW_SCHEMA, then LATERAL detects that
-   * correctly and suppresses schema change operation by producing output in 
same batch created with initial schema.
+   * When multiple left batch is received with same schema but with 
OK_NEW_SCHEMA, then LATERAL rebuilds the
+   * schema each time and sends output in multiple output batches
    * The schema change was only for columns which are not produced by the 
UNNEST or right branch.
    *
    * @throws Exception
@@ -904,6 +905,8 @@ public void 
testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForNonUnnestField() throw
       assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
       totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
       assertTrue(totalRecordCount ==
         (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
           leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -922,9 +925,8 @@ public void 
testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForNonUnnestField() throw
   }
 
   /**
-   * When multiple left batch is received with same schema but with 
OK_NEW_SCHEMA, then LATERAL detects that
-   * correctly and suppresses false schema change indication from both left 
and right branch. It produces output in
-   * same batch created with initial schema.
+   * When multiple left batch is received with same schema but with 
OK_NEW_SCHEMA, then LATERAL correctly
+   * handles it by re-creating the schema and producing multiple batches of 
final output
    * The schema change is for columns common on both left and right side.
    *
    * @throws Exception
@@ -976,6 +978,9 @@ public void 
testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForUnnestField() throws E
       assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
       totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
       assertTrue(totalRecordCount ==
         (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
           leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -2560,4 +2565,241 @@ public void 
test_OK_NEW_SCHEMAFromLeft_EmitFromRight_PostBuildSchema() throws Ex
       rightMockBatch.close();
     }
   }
+
+  /**
+   * Verifies that if a non-empty batch with OK_NEW_SCHEMA is received from 
right side post buildSchema phase then it
+   * is handled correctly by sending an empty batch with OK_NEW_SCHEMA and 
later consuming it to produce actual
+   * output batch with some data
+   */
+  @Test
+  public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws 
Exception {
+    // Create left input schema 2
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    // Create right input schema
+    TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.VARCHAR)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(2, "20", "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet emptyRightRowSet2 = 
fixture.rowSetBuilder(rightSchema2)
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = 
fixture.rowSetBuilder(rightSchema2)
+      .addRow(4, "41", "item41")
+      .addRow(5, "51", "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    // first OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container()); // non-empty 
OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, 
fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      int totalRecordCount = 0;
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
+      // This means 2 output record batches were received because of Schema 
change
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertEquals(0, ljBatch.getRecordCount());
+      totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(totalRecordCount ==
+        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
+          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      leftRowSet2.clear();
+      emptyRightRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
+
+  /**
+   * Test to verify in case of Multilevel lateral when a non-empty 
OK_NEW_SCHEMA batch post build schema phase is
+   * received from right most UNNEST of lower LATERAL then pipeline works fine.
+   * @throws Exception
+   */
+  @Test
+  public void 
testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws 
Exception {
+    // ** Prepare first pair of left batch and right batch for lower level 
LATERAL Lateral_1 **
+
+    // Create left input schema for first batch
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left_new", TypeProtos.MinorType.INT)
+      .add("cost_left_new", TypeProtos.MinorType.INT)
+      .add("name_left_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = 
fixture.rowSetBuilder(leftSchema2).build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = 
fixture.rowSetBuilder(leftSchema2)
+      .addRow(6, 60, "item6")
+      .build();
+
+    leftContainer.add(emptyLeftRowSet.container());
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(emptyLeftRowSet_leftSchema2.container());
+    leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch leftMockBatch_1 = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add("id_right_new", TypeProtos.MinorType.INT)
+      .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
+      .add("name_right_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = 
fixture.rowSetBuilder(rightSchema2).build();
+    final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = 
fixture.rowSetBuilder(rightSchema2)
+      .addRow(5, "51", "item51")
+      .addRow(6, "61", "item61")
+      .addRow(7, "71", "item71")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet_rightSchema2.container()); // 
non-empty batch with Ok_new_schema
+    rightContainer.add(emptyRightRowSet_rightSchema2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch_1 = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, 
JoinRelType.FULL);
+
+    final LateralJoinBatch lowerLevelLateral = new 
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+      leftMockBatch_1, rightMockBatch_1);
+
+    // ** Prepare second pair of left and right batch for upper level 
Lateral_2 **
+
+    // Create left input schema for first batch
+    TupleMetadata leftSchema3 = new SchemaBuilder()
+      .add("id_left_left", TypeProtos.MinorType.INT)
+      .add("cost_left_left", TypeProtos.MinorType.INT)
+      .add("name_left_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = 
fixture.rowSetBuilder(leftSchema3).build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = 
fixture.rowSetBuilder(leftSchema3)
+      .addRow(6, 60, "item6")
+      .build();
+
+    // Get left input schema for second left batch
+    TupleMetadata leftSchema4 = new SchemaBuilder()
+      .add("id_left_left_new", TypeProtos.MinorType.INT)
+      .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR)
+      .add("name_left_left_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = 
fixture.rowSetBuilder(leftSchema4)
+      .addRow(100, "100", "item100")
+      .build();
+
+    // Build Left container for upper level LATERAL operator
+    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);
+
+    // Get the left container with dummy data
+    leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
+    leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
+    leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());
+
+    // Get the left container outcomes for upper level LATERAL operator
+    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final CloseableRecordBatch leftMockBatch_2 = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());
+
+    final LateralJoinBatch upperLevelLateral = new 
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+      leftMockBatch_2, lowerLevelLateral);
+
+    try {
+      // 3 for first batch on left side and another 3 for next left batch
+      final int expectedOutputRecordCount = 6;
+      int actualOutputRecordCount = 0;
+
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == 
upperLevelLateral.next());
+      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == 
upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == 
upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
+      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      upperLevelLateral.close();
+      leftMockBatch_2.close();
+      lowerLevelLateral.close();
+      leftMockBatch_1.close();
+      rightMockBatch_1.close();
+      leftContainer2.clear();
+      leftOutcomes2.clear();
+    }
+  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
index 4757488559..38c0f51866 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -17,16 +17,19 @@
  */
 package org.apache.drill.exec.physical.impl.limit;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.rowSet.RowSet;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(OperatorTest.class)
 public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
index b3099d0b24..cc737e9793 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -17,17 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.rowSet.RowSet;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(OperatorTest.class)
 public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index ec043b26e8..70a32f8f4d 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -223,8 +223,12 @@ public void testUnnestMultipleNewSchemaIncoming() {
 
     Object[][][] baseline = {
         {
-          {1, 1, 2, 2, 2, 3, 3, 4},
-          {"0", "1", "2", "3", "4", "5", "6", "9"}
+          {1, 1, 2, 2, 2, 3, 3},
+          {"0", "1", "2", "3", "4", "5", "6"}
+        },
+        {
+          {4},
+          {"9"}
         }
     };
 
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java 
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index fa4d2767e7..672bb7e0c4 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -111,7 +111,7 @@ public void removeChild(MaterializedField field) {
    * <p>
    * By allowing the non-critical metadata to change, we preserve the
    * child relationships as a list or union evolves.
-   * @param type
+   * @param newType
    */
 
   public void replaceType(MajorType newType) {
@@ -190,11 +190,20 @@ public int hashCode() {
     return Objects.hash(this.name, this.type, this.children);
   }
 
+  /**
+   * Equals method doesn't check for the children list of fields here. When a 
batch is sent over network then it is
+   * serialized along with the Materialized Field which also contains 
information about the internal vectors like
+   * offset and bits. While deserializing, these vectors are treated as 
children of parent vector. If a operator on
+   * receiver side like Sort receives a schema in buildSchema phase and then 
later on receives another batch, that
+   * will result in schema change and query will fail. This is because second 
batch schema will contain information
+   * about internal vectors like offset and bits which will not be present in 
first batch schema. For ref: See
+   * TestSort#testSortWithRepeatedMapWithExchanges
+   *
+   * @param obj
+   * @return
+   */
   @Override
   public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
     if (obj == null) {
       return false;
     }
@@ -206,7 +215,7 @@ public boolean equals(Object obj) {
     // in MapVector$MapTransferPair
 
     return this.name.equalsIgnoreCase(other.name) &&
-            Objects.equals(this.type, other.type);
+      Objects.equals(this.type, other.type);
   }
 
   /**
@@ -230,6 +239,27 @@ public boolean equals(Object obj) {
    * sense.) Operators that want to reconcile two maps that differ only in
    * column order need a different comparison.</li>
    * </ul>
+   * <ul>
+   * Note: Materialized Field and ValueVector has 1:1 mapping which means for 
each ValueVector there is a materialized
+   * field associated with it. So when we replace or add a ValueVector in a 
VectorContainer then we create new
+   * Materialized Field object for the new vector. This works fine for 
Primitive type ValueVectors but for ValueVector
+   * which are of type {@link 
org.apache.drill.exec.vector.complex.AbstractContainerVector} there is some 
differences on
+   * how Materialized field and ValueVector objects are updated inside the 
container which both ValueVector and
+   * Materialized Field object both mutable.
+   * <p>
+   * For example: For cases of MapVector it can so happen that only the 
children field type changed but
+   * the parent Map type and name remained same. In these cases we replace the 
children field ValueVector from parent
+   * MapVector inside main batch container, with new type of vector. Thus the 
reference of parent MaprVector inside
+   * batch container remains same but the reference of children field 
ValueVector stored inside MapVector get's updated.
+   * During this update it also replaces the Materialized field for that 
children field which is stored in childrens
+   * list of the parent MapVector Materialized Field.
+   * Since the children list of parent Materialized Field is updated, this 
make this class mutable. Hence there should
+   * not be any check for object reference equality here but instead there 
should be deep comparison which is what
+   * this method is now performing. Since if we have object reference check 
then in above cases it will return true for
+   * 2 Materialized Field object whose children field list is different which 
is not correct. Same holds true for
+   * {@link MaterializedField#isEquivalent(MaterializedField)} method.
+   * </p>
+   * </ul>
    *
    * @param other another field
    * @return <tt>true</tt> if the columns are identical according to the
@@ -237,9 +267,6 @@ public boolean equals(Object obj) {
    */
 
   public boolean isEquivalent(MaterializedField other) {
-    if (this == other) {
-      return true;
-    }
     if (! name.equalsIgnoreCase(other.name)) {
       return false;
     }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 36823970d0..1d0e03be34 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -129,6 +129,9 @@ public boolean allocateNewSafe() {
       return (T) existing;
     } else if (nullFilled(existing)) {
       existing.clear();
+      // Since it's removing old vector and adding new one based on new type, 
it should do same for Materialized field,
+      // Otherwise there will be duplicate of same field with same name but 
different type.
+      field.removeChild(existing.getField());
       create = true;
     }
     if (create) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle Schema change in Unnest And Lateral for unnest field / non-unnest field
> ------------------------------------------------------------------------------
>
>                 Key: DRILL-6418
>                 URL: https://issues.apache.org/jira/browse/DRILL-6418
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: Sorabh Hamirwasia
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> Handling the schema change scenarios for LATERAL and UNNEST when schema 
> change is observed for non-unnest field and unnest field. It should also 
> handle the scenario when UNNEST field is Repeated Map Type and schema change 
> happens only in the children field of Map component. Following issues were 
> found:
> 1) There were issues found in how scan treats that kind of data where it 
> scans 2 files, one has Map (let say cutomer_order) children type (custKey) as 
> integer and other has custKey as string type. Then Scan just replaces the 
> ValueVector from its output container for custKey but in the schema it has 2 
> fields with same name but different types.
> 2) Unnest and Lateral check for schema change across batches based on the 
> MaterializedField and BatchSchema reference they store. But since it's a 
> reference any change in pointed value changes their reference as well and 
> hence comparison always returns true. So for Unnest it actually keeps a clone 
> of the Materialized Field instead of reference and use that to determine for 
> schema change. For LATERAL any OK_NEW_SCHEMA from upstream is treated as 
> actual schema change and setup is done again.
> 3) Since the MaterializedField is mutable the isEquivalent method checks for 
> object equality which is removed as that is not correct for mutable objects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to