This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 069c3049f1a500e5ae0b47caeebc5856ab182b73 Author: Sorabh Hamirwasia <[email protected]> AuthorDate: Fri Jun 29 10:27:55 2018 -0700 DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules This closes #1356 --- .../exec/physical/impl/join/HashJoinBatch.java | 7 +- .../exec/physical/impl/join/LateralJoinBatch.java | 71 +++++++-- .../exec/physical/impl/join/MergeJoinBatch.java | 3 +- .../drill/exec/record/JoinBatchMemoryManager.java | 19 ++- .../impl/join/TestLateralJoinCorrectness.java | 160 ++++++++++++++++++++- .../unnest/TestUnnestWithLateralCorrectness.java | 104 +++++++++----- 6 files changed, 310 insertions(+), 54 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 047c597..345d182 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -67,9 +68,6 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.calcite.rel.core.JoinRelType; -import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX; -import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX; - /** * This class implements the runtime execution for the Hash-Join operator * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins @@ -892,7 +890,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}", configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); - batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right); + batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>()); + logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 84dc5c3..fc3c8b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.join; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; @@ -27,6 +28,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.LateralContract; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; @@ -34,10 +36,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.SchemaBuilder; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; +import java.util.HashSet; +import java.util.List; + import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; @@ -82,6 +88,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Flag to keep track of new left batch so that update on memory manager is called only once per left batch private boolean isNewLeftBatch = false; + private final HashSet<String> excludedFieldNames = new HashSet<>(); + /* **************************************************************************************************************** * Public Methods * ****************************************************************************************************************/ @@ -91,7 +99,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> Preconditions.checkNotNull(left); Preconditions.checkNotNull(right); final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); - batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right); + // Prepare Schema Path Mapping + populateExcludedField(popConfig); + batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames); // Initially it's set to default value of 64K and later for each new output row it will be set to the computed // row count @@ -700,6 +710,21 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> return isValid; } + private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) { + if (excludedFieldNames.size() == 0) { + return originSchema; + } + + final SchemaBuilder newSchemaBuilder = + BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode()); + for (MaterializedField field : originSchema) { + if (!excludedFieldNames.contains(field.getName())) { + newSchemaBuilder.addField(field); + } + } + return newSchemaBuilder.build(); + } + /** * Helps to create the outgoing container vectors based on known left and right batch schemas * @throws SchemaChangeException @@ -711,8 +736,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Clear up the container container.clear(); - leftSchema = left.getSchema(); - rightSchema = right.getSchema(); + leftSchema = batchSchemaWithNoExcludedCols(left.getSchema()); + rightSchema = batchSchemaWithNoExcludedCols(right.getSchema()); if (!verifyInputSchema(leftSchema)) { throw new SchemaChangeException("Invalid Schema found for left incoming batch"); @@ -724,12 +749,20 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Setup LeftSchema in outgoing container for (final VectorWrapper<?> vectorWrapper : left) { - container.addOrGet(vectorWrapper.getField()); + final MaterializedField leftField = vectorWrapper.getField(); + if (excludedFieldNames.contains(leftField.getName())) { + continue; + } + container.addOrGet(leftField); } // Setup RightSchema in the outgoing container for (final VectorWrapper<?> vectorWrapper : right) { MaterializedField rightField = vectorWrapper.getField(); + if (excludedFieldNames.contains(rightField.getName())) { + continue; + } + TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType(); // make right input schema optional if we have LEFT join @@ -846,15 +879,28 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Get the vectors using field index rather than Materialized field since input batch field can be different from // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output // container. + int inputIndex = 0; for (int i = startVectorIndex; i < endVectorIndex; ++i) { - // Get input vector - final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass(); - final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector(); - // Get output vector final int outputVectorIndex = i + baseVectorIndex; final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass(); final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector(); + final String outputFieldName = outputVector.getField().getName(); + + ValueVector inputVector; + Class<?> inputValueClass; + String inputFieldName; + do { + // Get input vector + inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass(); + inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector(); + inputFieldName = inputVector.getField().getName(); + ++inputIndex; + } while (excludedFieldNames.contains(inputFieldName)); + + Preconditions.checkArgument(outputFieldName.equals(inputFieldName), + new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" + + ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema()))); logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " + "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," + @@ -938,4 +984,13 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> maxOutputRowCount = newOutputRowCount; } } + + private void populateExcludedField(PhysicalOperator lateralPop) { + final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns(); + if (excludedCols != null) { + for (SchemaPath currentPath : excludedCols) { + excludedFieldNames.add(currentPath.rootName()); + } + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 62967a9..ea34ed9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -62,6 +62,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import java.io.IOException; +import java.util.HashSet; import java.util.List; import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; @@ -108,7 +109,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { private class MergeJoinMemoryManager extends JoinBatchMemoryManager { MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) { - super(outputBatchSize, leftBatch, rightBatch); + super(outputBatchSize, leftBatch, rightBatch, new HashSet<>()); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java index 2ebe887..4344e13 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -17,29 +17,44 @@ */ package org.apache.drill.exec.record; +import java.util.Set; + public class JoinBatchMemoryManager extends RecordBatchMemoryManager { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class); private int rowWidth[]; private RecordBatch recordBatch[]; + private Set<String> columnsToExclude; private static final int numInputs = 2; public static final int LEFT_INDEX = 0; public static final int RIGHT_INDEX = 1; - public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) { + public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, + RecordBatch rightBatch, Set<String> excludedColumns) { super(numInputs, outputBatchSize); recordBatch = new RecordBatch[numInputs]; recordBatch[LEFT_INDEX] = leftBatch; recordBatch[RIGHT_INDEX] = rightBatch; rowWidth = new int[numInputs]; + this.columnsToExclude = excludedColumns; } private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { updateIncomingStats(inputIndex); rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth(); - final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX]; + // Reduce the width of excluded columns from actual rowWidth + for (String columnName : columnsToExclude) { + final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName); + if (currentColSizer == null) { + continue; + } + rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry(); + } + + // Get final net outgoing row width after reducing the excluded columns width + int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX]; // If outgoing row width is 0 or there is no change in outgoing row width, just return. // This is possible for empty batches or diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java index 2723e30..ffac4b6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java @@ -21,6 +21,7 @@ import avro.shaded.com.google.common.collect.Lists; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.categories.OperatorTest; import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.OperatorContext; @@ -29,11 +30,13 @@ import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.physical.impl.MockRecordBatch; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.mock.MockStorePOP; import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.DirectRowSet; import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetComparison; import org.apache.drill.test.rowSet.schema.SchemaBuilder; import org.junit.After; import org.junit.AfterClass; @@ -2870,4 +2873,159 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { leftRowSet2.clear(); } } + + private void testExcludedColumns(List<SchemaPath> excludedCols, CloseableRecordBatch left, + CloseableRecordBatch right, RowSet expectedRowSet) throws Exception { + LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols); + final LateralJoinBatch ljBatch = new LateralJoinBatch(lateralPop, fixture.getFragmentContext(), left, right); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } finally { + ljBatch.close(); + left.close(); + right.close(); + expectedRowSet.clear(); + } + } + + @Test + public void testFillingUpOutputBatch_WithExcludedColumns() throws Exception { + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .add("id_right", TypeProtos.MinorType.INT) + .add("cost_right", TypeProtos.MinorType.INT) + .add("name_right", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema) + .addRow(1, "item1", 1, 11, "item11") + .addRow(1, "item1", 2, 21, "item21") + .addRow(1, "item1", 3, 31, "item31") + .addRow(2, "item20", 4, 41, "item41") + .addRow(2, "item20", 5, 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + List<SchemaPath> excludedCols = new ArrayList<>(); + excludedCols.add(SchemaPath.getSimplePath("cost_left")); + + try { + testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet); + } finally { + // Close all the resources for this test case + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } + + @Test + public void testFillingUpOutputBatch_With2ExcludedColumns() throws Exception { + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(2, 20, "item20") + .build(); + + // Create data for right input + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(4, 41, "item41") + .addRow(5, 51, "item51") + .build(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("name_left", TypeProtos.MinorType.VARCHAR) + //.add("id_right", TypeProtos.MinorType.INT) + .add("cost_right", TypeProtos.MinorType.INT) + .add("name_right", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema) + /*.addRow("item1", 1, 11, "item11") + .addRow("item1", 2, 21, "item21") + .addRow("item1", 3, 31, "item31") + .addRow("item20", 4, 41, "item41") + .addRow("item20", 5, 51, "item51") */ + .addRow("item1", 11, "item11") + .addRow("item1", 21, "item21") + .addRow("item1", 31, "item31") + .addRow("item20", 41, "item41") + .addRow("item20", 51, "item51") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + List<SchemaPath> excludedCols = new ArrayList<>(); + excludedCols.add(SchemaPath.getSimplePath("cost_left")); + excludedCols.add(SchemaPath.getSimplePath("id_left")); + excludedCols.add(SchemaPath.getSimplePath("id_right")); + + try { + testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet); + } finally { + // Close all the resources for this test case + leftRowSet2.clear(); + nonEmptyRightRowSet2.clear(); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java index 3a7f899..c2e64f4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java @@ -37,8 +37,8 @@ import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.planner.logical.DrillLogicalTestutils; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.mock.MockStorePOP; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarCharVector; @@ -106,7 +106,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK}; try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, false); } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } @@ -140,7 +140,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK}; try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, false); } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } @@ -161,7 +161,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK}; try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, false); } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } @@ -192,7 +192,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK}; try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, false); } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } @@ -240,7 +240,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { RecordBatch.IterOutcome.OK_NEW_SCHEMA}; try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, false); } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } @@ -289,28 +289,15 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { RecordBatch.IterOutcome.OK_NEW_SCHEMA}; try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, false); } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } } - @Test - public void testUnnestLimitBatchSize() { - - final int limitedOutputBatchSize = 127; - final int inputBatchSize = limitedOutputBatchSize + 1; - // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count - // Lateral output batch size = N * input row size + N * size of single unnest column - // = N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry)) - // + N * 4 - // = N * (4 + 2*4 + (N+1)*4 ) + N * 4 - // = N * (16 + 4N) + N * 4 - // = 4N * (N + 5) - // configure the output batch size to be one more record than that so that the batch sizer can round down - final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6); - + private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize, + int limitOutputBatchSizeBytes, boolean excludeUnnestColumn) { // single record batch with single row. The unnest column has one // more record than the batch size we want in the output Object[][] data = new Object[1][1]; @@ -323,39 +310,76 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { } } } + Integer[][][] baseline = new Integer[2][2][]; - baseline[0][0] = new Integer[limitedOutputBatchSize]; - baseline[0][1] = new Integer[limitedOutputBatchSize]; + baseline[0][0] = new Integer[limitOutputBatchSize]; + baseline[0][1] = new Integer[limitOutputBatchSize]; baseline[1][0] = new Integer[1]; baseline[1][1] = new Integer[1]; - for (int i = 0; i < limitedOutputBatchSize; i++) { + for (int i = 0; i < limitOutputBatchSize; i++) { baseline[0][0][i] = 1; baseline[0][1][i] = i; } baseline[1][0][0] = 1; // row Num - baseline[1][1][0] = limitedOutputBatchSize; // value + baseline[1][1][0] = limitOutputBatchSize; // value // Create input schema TupleMetadata incomingSchema = new SchemaBuilder() - .add("rowNumber", TypeProtos.MinorType.INT) - .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema(); + .add("rowNumber", TypeProtos.MinorType.INT) + .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema(); TupleMetadata[] incomingSchemas = {incomingSchema}; RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK}; final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants - .OUTPUT_BATCH_SIZE_VALIDATOR); - fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes); + .OUTPUT_BATCH_SIZE_VALIDATOR); + fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitOutputBatchSizeBytes); try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, excludeUnnestColumn); } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } finally { fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize); } + } + @Test + public void testUnnestLimitBatchSize_WithExcludedCols() { + LateralJoinPOP previoudPop = ljPopConfig; + List<SchemaPath> excludedCols = new ArrayList<>(); + excludedCols.add(SchemaPath.getSimplePath("unnestColumn")); + ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols); + final int limitedOutputBatchSize = 127; + final int inputBatchSize = limitedOutputBatchSize + 1; + // Since we want 127 row count and because of nearest power of 2 adjustment output row count will be reduced to + // 64. So we should configure batch size for (N+1) rows if we want to output N rows where N is not power of 2 + // size of lateral output batch = (N+1)*8 bytes, where N = output batch row count + // Lateral output batch size = (N+1) * (input row size without unnest field) + (N+1) * size of single unnest column + // = (N+1) * (size of row id) + (N+1) * (size of single array entry) + // = (N+1)*4 + (N+1) * 4 + // = (N+1) * 8 + // configure the output batch size to be one more record than that so that the batch sizer can round down + final int limitedOutputBatchSizeBytes = 8 * (limitedOutputBatchSize + 1); + testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, true); + ljPopConfig = previoudPop; + } + + @Test + public void testUnnestLimitBatchSize() { + final int limitedOutputBatchSize = 127; + final int inputBatchSize = limitedOutputBatchSize + 1; + // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count + // Lateral output batch size = N * input row size + N * size of single unnest column + // = N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry)) + // + N * 4 + // = N * (4 + 2*4 + (N+1)*4 ) + N * 4 + // = N * (16 + 4N) + N * 4 + // = 4N * (N + 5) + // configure the output batch size to be one more record than that so that the batch sizer can round down + final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6); + testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, false); } @Test @@ -405,7 +429,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes); try { - testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest. + testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest. } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } finally { @@ -463,7 +487,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes); try { - testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest. + testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest. } catch (Exception e) { fail("Failed due to exception: " + e.getMessage()); } finally { @@ -496,7 +520,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK}; try { - testUnnest(incomingSchemas, iterOutcomes, data, baseline); + testUnnest(incomingSchemas, iterOutcomes, data, baseline, false); } catch (UserException|UnsupportedOperationException e) { return; // succeeded } catch (Exception e) { @@ -511,8 +535,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { TupleMetadata[] incomingSchemas, RecordBatch.IterOutcome[] iterOutcomes, T[][] data, - T[][][] baseline ) throws Exception{ - testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline); + T[][][] baseline, + boolean excludeUnnestColumn) throws Exception{ + testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline, excludeUnnestColumn); } // test unnest for various input conditions optionally invoking kill. if the kill or killBatch @@ -522,7 +547,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { int unnestLimit, // kill unnest after every 'unnestLimit' number of values in every record int execKill, // number of batches after which to kill the execution (!) T[][] data, - T[][][] baseline) throws Exception { + T[][][] baseline, + boolean excludeUnnestColumn) throws Exception { // Get the incoming container with dummy data for LJ final List<VectorContainer> incomingContainer = new ArrayList<>(data.length); @@ -606,7 +632,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { //int valueIndex = 0; for ( List<ValueVector> batch: results) { int vectorCount= batch.size(); - if (vectorCount!= baseline[batchIndex].length+1) { // baseline does not include the original unnest column + int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1; + expectedVectorCount += baseline[batchIndex].length; + if (vectorCount!= expectedVectorCount) { // baseline does not include the original unnest column fail("Test failed in validating unnest output. Batch column count mismatch."); } for (ValueVector vv : batch) {
