This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit b7d259ba9c8c2b28700c9da33bb97dd79ef04cbc Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> AuthorDate: Tue May 15 14:27:31 2018 -0700 DRILL-6418: Handle Schema change in Unnest And Lateral for unnest field / non-unnest field Note: Changed Lateral to handle non-empty right batch with OK_NEW_SCHEMA closes #1271 --- .../exec/physical/impl/join/LateralJoinBatch.java | 45 +--- .../physical/impl/project/ProjectRecordBatch.java | 2 +- .../physical/impl/unnest/UnnestRecordBatch.java | 6 +- .../org/apache/drill/exec/record/BatchSchema.java | 27 ++- .../apache/drill/exec/record/VectorContainer.java | 9 + .../impl/join/TestLateralJoinCorrectness.java | 252 ++++++++++++++++++++- .../impl/limit/TestLimitBatchEmitOutcome.java | 3 + .../impl/project/TestProjectEmitOutcome.java | 3 + .../unnest/TestUnnestWithLateralCorrectness.java | 8 +- .../drill/exec/record/MaterializedField.java | 43 +++- .../exec/vector/complex/AbstractMapVector.java | 3 + 11 files changed, 347 insertions(+), 54 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 8ea381b..a09913f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -125,8 +125,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Left side has some records in the batch so let's process right batch childOutcome = processRightBatch(); - // reset the left & right outcomes to OK here and send the empty batch downstream - // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do + // reset the left & right outcomes to OK here and send the empty batch downstream. Non-Empty right batch with + // OK_NEW_SCHEMA will be handled in subsequent next call if (childOutcome == OK_NEW_SCHEMA) { leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; rightUpstream = OK; @@ -344,22 +344,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> switch (leftUpstream) { case OK_NEW_SCHEMA: // This OK_NEW_SCHEMA is received post build schema phase and from left side - // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting - // up any incoming vector references in setupNewSchema. While copying the records it always work on latest - // incoming vector. - if (!isSchemaChanged(left.getSchema(), leftSchema)) { - logger.warn(String.format("New schema received from left side is same as previous known left schema. " + - "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema())); - - // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose - // OK_NEW_SCHEMA outcome - processLeftBatchInFuture = false; - if (emptyLeftBatch) { - continue; - } else { - leftUpstream = OK; - } - } else if (outputIndex > 0) { // can only reach here from produceOutputBatch + if (outputIndex > 0) { // can only reach here from produceOutputBatch // This means there is already some records from previous join inside left batch // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call processLeftBatchInFuture = true; @@ -439,20 +424,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through // - // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema - // downstream and later with subsequent next() call the join output will be produced - Preconditions.checkState(right.getRecordCount() == 0, - "Right side batch with OK_NEW_SCHEMA is not empty"); - - if (!isSchemaChanged(right.getSchema(), rightSchema)) { - logger.warn(String.format("New schema received from right side is same as previous known right schema. " + - "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s", - rightSchema, right.getSchema())); - continue; - } + // Right batch with OK_NEW_SCHEMA can be non-empty so update the rightJoinIndex correctly and pass the + // new schema downstream with empty batch and later with subsequent next() call the join output will be + // produced if (handleSchemaChange()) { container.setRecordCount(0); - rightJoinIndex = -1; + rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1; return OK_NEW_SCHEMA; } else { return STOP; @@ -637,10 +614,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> container.buildSchema(BatchSchema.SelectionVectorMode.NONE); batchMemoryManager.updateOutgoingStats(outputIndex); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); - logger.debug("Number of records emitted: " + outputIndex); - } + + logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); + logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", outputIndex, + container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation()); // Update the output index for next output batch to zero outputIndex = 0; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index eab9007..8a88db9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -526,7 +526,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { @Override protected boolean setupNewSchema() throws SchemaChangeException { setupNewSchemaFromInput(this.incoming); - if (container.isSchemaChanged()) { + if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) { container.buildSchema(SelectionVectorMode.NONE); return true; } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index fe91fc3..ed5d91c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -389,11 +389,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO final MaterializedField thisField = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]); final MaterializedField prevField = unnestFieldMetadata; Preconditions.checkNotNull(thisField); - unnestFieldMetadata = thisField; + // isEquivalent may return false if the order of the fields has changed. This usually does not // happen but if it does we end up throwing a spurious schema change exeption if (prevField == null || !prevField.isEquivalent(thisField)) { logger.debug("Schema changed"); + // We should store the clone of MaterializedField for unnest column instead of reference. When the column is of + // type Map and there is change in any children field of the Map then that will update the reference variable and + // isEquivalent check will still return true. + unnestFieldMetadata = thisField.clone(); return true; } return false; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 67598e0..f161234 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -100,8 +100,29 @@ public class BatchSchema implements Iterable<MaterializedField> { return result; } - // DRILL-5525: the semantics of this method are badly broken. - // Caveat emptor. + /** + * DRILL-5525: the semantics of this method are badly broken. + * Caveat emptor. + * + * This check used for detecting actual schema change inside operator record batch will not work for + * AbstractContainerVectors (like MapVector). In each record batch a reference to incoming batch schema is + * stored (let say S:{a: int}) and then equals is called on that stored reference and current incoming batch schema. + * Internally schema object has references to Materialized fields from vectors in container. If there is change in + * incoming batch schema, then the upstream will create a new ValueVector in its output container with the new + * detected type, which in turn will have new instance for Materialized Field. Then later a new BatchSchema object + * is created for this new incoming batch (let say S":{a":varchar}). The operator calling equals will have reference + * to old schema object (S) and hence first check will not be satisfied and then it will call equals on each of the + * Materialized Field (a.equals(a")). Since new materialized field is created for newly created vector the equals + * check on field will return false. And schema change will be detected in this case. + * Now consider instead of int vector there is a MapVector such that initial schema was (let say S:{a:{b:int, c:int}} + * and then later schema for Map field c changes, then in container Map vector will be found but later the children + * vector for field c will be replaced. This new schema object will be created as (S":{a:{b:int, c":varchar}}). Now + * when S.equals(S") is called it will eventually call a.equals(a) which will return true even though the schema of + * children value vector c has changed. This is because no new vector is created for field (a) and hence it's object + * reference to MaterializedField has not changed which will be reflected in both old and new schema instances. + * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} method instead since + * {@link MaterializedField#isEquivalent(MaterializedField)} method is updated to remove the reference check. + */ @Override public boolean equals(Object obj) { @@ -151,7 +172,7 @@ public class BatchSchema implements Iterable<MaterializedField> { /** * Compare that two schemas are identical according to the rules defined - * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In particular, + * in {@link MaterializedField#isEquivalent(MaterializedField)}. In particular, * this method requires that the fields have a 1:1 ordered correspondence * in the two schemas. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index e35bb5f..0ea23f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -135,6 +135,15 @@ public class VectorContainer implements VectorAccessible { return addOrGet(field, null); } + /** + * This method should be called with MaterializedField which also has correct children field list specially when + * the field type is MAP. Otherwise after calling this method if caller is not creating TransferPair on the + * ValueVector, then the new ValueVector will not have information about it's list of children MaterializedField. + * @param field + * @param callBack + * @param <T> + * @return + */ @SuppressWarnings("unchecked") public <T extends ValueVector> T addOrGet(final MaterializedField field, final SchemaChangeCallBack callBack) { final TypedFieldId id = getValueVectorId(SchemaPath.getSimplePath(field.getName())); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java index 51df6e4..e9e9aac 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.List; import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @Category(OperatorTest.class) @@ -853,8 +854,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { } /** - * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that - * correctly and suppresses schema change operation by producing output in same batch created with initial schema. + * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL rebuilds the + * schema each time and sends output in multiple output batches * The schema change was only for columns which are not produced by the UNNEST or right branch. * * @throws Exception @@ -904,6 +905,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); totalRecordCount += ljBatch.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); assertTrue(totalRecordCount == (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); @@ -922,9 +925,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { } /** - * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that - * correctly and suppresses false schema change indication from both left and right branch. It produces output in - * same batch created with initial schema. + * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL correctly + * handles it by re-creating the schema and producing multiple batches of final output * The schema change is for columns common on both left and right side. * * @throws Exception @@ -976,6 +978,9 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); totalRecordCount += ljBatch.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); assertTrue(totalRecordCount == (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); @@ -2560,4 +2565,241 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { rightMockBatch.close(); } } + + /** + * Verifies that if a non-empty batch with OK_NEW_SCHEMA is received from right side post buildSchema phase then it + * is handled correctly by sending an empty batch with OK_NEW_SCHEMA and later consuming it to produce actual + * output batch with some data + */ + @Test + public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Exception { + // Create left input schema 2 + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.VARCHAR) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + // Create right input schema + TupleMetadata rightSchema2 = new SchemaBuilder() + .add("id_right", TypeProtos.MinorType.INT) + .add("cost_right", TypeProtos.MinorType.VARCHAR) + .add("name_right", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2) + .addRow(2, "20", "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2) + .build(); + + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2) + .addRow(4, "41", "item41") + .addRow(5, "51", "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + // first OK_NEW_SCHEMA batch + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); // non-empty OK_NEW_SCHEMA batch + rightContainer.add(emptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + int totalRecordCount = 0; + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + // This means 2 output record batches were received because of Schema change + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertEquals(0, ljBatch.getRecordCount()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + totalRecordCount += ljBatch.getRecordCount(); + assertTrue(totalRecordCount == + (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() + + leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount())); + + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + emptyRightRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + /** + * Test to verify in case of Multilevel lateral when a non-empty OK_NEW_SCHEMA batch post build schema phase is + * received from right most UNNEST of lower LATERAL then pipeline works fine. + * @throws Exception + */ + @Test + public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws Exception { + // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 ** + + // Create left input schema for first batch + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left_new", TypeProtos.MinorType.INT) + .add("cost_left_new", TypeProtos.MinorType.INT) + .add("name_left_new", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build(); + final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2) + .addRow(6, 60, "item6") + .build(); + + leftContainer.add(emptyLeftRowSet.container()); + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(emptyLeftRowSet_leftSchema2.container()); + leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + TupleMetadata rightSchema2 = new SchemaBuilder() + .add("id_right_new", TypeProtos.MinorType.INT) + .add("cost_right_new", TypeProtos.MinorType.VARCHAR) + .add("name_right_new", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build(); + final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2) + .addRow(5, "51", "item51") + .addRow(6, "61", "item61") + .addRow(7, "71", "item71") + .build(); + + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet_rightSchema2.container()); // non-empty batch with Ok_new_schema + rightContainer.add(emptyRightRowSet_rightSchema2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL); + + final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_1, rightMockBatch_1); + + // ** Prepare second pair of left and right batch for upper level Lateral_2 ** + + // Create left input schema for first batch + TupleMetadata leftSchema3 = new SchemaBuilder() + .add("id_left_left", TypeProtos.MinorType.INT) + .add("cost_left_left", TypeProtos.MinorType.INT) + .add("name_left_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build(); + final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3) + .addRow(6, 60, "item6") + .build(); + + // Get left input schema for second left batch + TupleMetadata leftSchema4 = new SchemaBuilder() + .add("id_left_left_new", TypeProtos.MinorType.INT) + .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR) + .add("name_left_left_new", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4) + .addRow(100, "100", "item100") + .build(); + + // Build Left container for upper level LATERAL operator + final List<VectorContainer> leftContainer2 = new ArrayList<>(5); + + // Get the left container with dummy data + leftContainer2.add(emptyLeftRowSet_leftSchema3.container()); + leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container()); + leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container()); + + // Get the left container outcomes for upper level LATERAL operator + final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5); + leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes2.add(RecordBatch.IterOutcome.OK); + leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema()); + + final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(), + leftMockBatch_2, lowerLevelLateral); + + try { + // 3 for first batch on left side and another 3 for next left batch + final int expectedOutputRecordCount = 6; + int actualOutputRecordCount = 0; + + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next()); + assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next()); + actualOutputRecordCount += upperLevelLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next()); + actualOutputRecordCount += upperLevelLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next()); + actualOutputRecordCount += upperLevelLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next()); + actualOutputRecordCount += upperLevelLateral.getRecordCount(); + assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next()); + assertTrue(actualOutputRecordCount == expectedOutputRecordCount); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + upperLevelLateral.close(); + leftMockBatch_2.close(); + lowerLevelLateral.close(); + leftMockBatch_1.close(); + rightMockBatch_1.close(); + leftContainer2.clear(); + leftOutcomes2.clear(); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java index 4757488..38c0f51 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java @@ -17,16 +17,19 @@ */ package org.apache.drill.exec.physical.impl.limit; +import org.apache.drill.categories.OperatorTest; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.impl.MockRecordBatch; import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.test.rowSet.RowSet; import org.junit.Test; +import org.junit.experimental.categories.Category; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +@Category(OperatorTest.class) public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome { /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java index b3099d0..cc737e9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java @@ -17,17 +17,20 @@ */ package org.apache.drill.exec.physical.impl.project; +import org.apache.drill.categories.OperatorTest; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.impl.MockRecordBatch; import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.test.rowSet.RowSet; import org.junit.Test; +import org.junit.experimental.categories.Category; import static junit.framework.TestCase.fail; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +@Category(OperatorTest.class) public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome { /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java index ec043b2..70a32f8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java @@ -223,8 +223,12 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { Object[][][] baseline = { { - {1, 1, 2, 2, 2, 3, 3, 4}, - {"0", "1", "2", "3", "4", "5", "6", "9"} + {1, 1, 2, 2, 2, 3, 3}, + {"0", "1", "2", "3", "4", "5", "6"} + }, + { + {4}, + {"9"} } }; diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java index fa4d276..672bb7e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -111,7 +111,7 @@ public class MaterializedField { * <p> * By allowing the non-critical metadata to change, we preserve the * child relationships as a list or union evolves. - * @param type + * @param newType */ public void replaceType(MajorType newType) { @@ -190,11 +190,20 @@ public class MaterializedField { return Objects.hash(this.name, this.type, this.children); } + /** + * Equals method doesn't check for the children list of fields here. When a batch is sent over network then it is + * serialized along with the Materialized Field which also contains information about the internal vectors like + * offset and bits. While deserializing, these vectors are treated as children of parent vector. If a operator on + * receiver side like Sort receives a schema in buildSchema phase and then later on receives another batch, that + * will result in schema change and query will fail. This is because second batch schema will contain information + * about internal vectors like offset and bits which will not be present in first batch schema. For ref: See + * TestSort#testSortWithRepeatedMapWithExchanges + * + * @param obj + * @return + */ @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } if (obj == null) { return false; } @@ -206,7 +215,7 @@ public class MaterializedField { // in MapVector$MapTransferPair return this.name.equalsIgnoreCase(other.name) && - Objects.equals(this.type, other.type); + Objects.equals(this.type, other.type); } /** @@ -230,6 +239,27 @@ public class MaterializedField { * sense.) Operators that want to reconcile two maps that differ only in * column order need a different comparison.</li> * </ul> + * <ul> + * Note: Materialized Field and ValueVector has 1:1 mapping which means for each ValueVector there is a materialized + * field associated with it. So when we replace or add a ValueVector in a VectorContainer then we create new + * Materialized Field object for the new vector. This works fine for Primitive type ValueVectors but for ValueVector + * which are of type {@link org.apache.drill.exec.vector.complex.AbstractContainerVector} there is some differences on + * how Materialized field and ValueVector objects are updated inside the container which both ValueVector and + * Materialized Field object both mutable. + * <p> + * For example: For cases of MapVector it can so happen that only the children field type changed but + * the parent Map type and name remained same. In these cases we replace the children field ValueVector from parent + * MapVector inside main batch container, with new type of vector. Thus the reference of parent MaprVector inside + * batch container remains same but the reference of children field ValueVector stored inside MapVector get's updated. + * During this update it also replaces the Materialized field for that children field which is stored in childrens + * list of the parent MapVector Materialized Field. + * Since the children list of parent Materialized Field is updated, this make this class mutable. Hence there should + * not be any check for object reference equality here but instead there should be deep comparison which is what + * this method is now performing. Since if we have object reference check then in above cases it will return true for + * 2 Materialized Field object whose children field list is different which is not correct. Same holds true for + * {@link MaterializedField#isEquivalent(MaterializedField)} method. + * </p> + * </ul> * * @param other another field * @return <tt>true</tt> if the columns are identical according to the @@ -237,9 +267,6 @@ public class MaterializedField { */ public boolean isEquivalent(MaterializedField other) { - if (this == other) { - return true; - } if (! name.equalsIgnoreCase(other.name)) { return false; } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java index 3682397..1d0e03b 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java @@ -129,6 +129,9 @@ public abstract class AbstractMapVector extends AbstractContainerVector { return (T) existing; } else if (nullFilled(existing)) { existing.clear(); + // Since it's removing old vector and adding new one based on new type, it should do same for Materialized field, + // Otherwise there will be duplicate of same field with same name but different type. + field.removeChild(existing.getField()); create = true; } if (create) { -- To stop receiving notification emails like this one, please contact ar...@apache.org.