http://git-wip-us.apache.org/repos/asf/drill/blob/8d562d07/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
new file mode 100644
index 0000000..4c6b4c0
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.fail;
+
+@Category(OperatorTest.class)
+public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
+
+
+ // Operator Context for mock batch
+ public static OperatorContext operatorContext;
+
+ public static PhysicalOperator mockPopConfig;
+ public static LateralJoinPOP ljPopConfig;
+
+
+ @BeforeClass public static void setUpBeforeClass() throws Exception {
+ mockPopConfig = new MockStorePOP(null);
+ ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ operatorContext = fixture.newOperatorContext(mockPopConfig);
+ }
+
+ @AfterClass public static void tearDownAfterClass() throws Exception {
+ operatorContext.close();
+ }
+
+ @Test
+ public void testUnnestFixedWidthColumn() {
+
+ Object[][] data = {
+ { (Object) new int[] {1, 2},
+ (Object) new int[] {3, 4, 5}},
+ { (Object) new int[] {6, 7, 8, 9},
+ (Object) new int[] {10, 11, 12, 13, 14}}
+ };
+
+ // Create input schema
+ TupleMetadata incomingSchema =
+ new SchemaBuilder()
+ .add("rowNumber", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.INT)
+ .buildSchema();
+ TupleMetadata[] incomingSchemas = { incomingSchema, incomingSchema };
+
+ Integer[][][] baseline = {
+ {
+ {1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4}, //rowNum
+ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14} // unnestColumn_flat
+ }
+ };
+
+ RecordBatch.IterOutcome[] iterOutcomes =
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testUnnestVarWidthColumn() {
+
+ Object[][] data = {
+ { (Object) new String[] {"", "zero"},
+ (Object) new String[] {"one", "two", "three"}},
+ { (Object) new String[] {"four", "five", "six", "seven"},
+ (Object) new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
+ };
+
+ // Create input schema
+ TupleMetadata incomingSchema = new SchemaBuilder()
+ .add("someColumn", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+ TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+ Object[][][] baseline = {
+ {
+ {1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4}, // rowNum
+ {"", "zero", "one", "two", "three", "four", "five", "six", "seven",
"eight", "nine", "ten", "eleven",
+ "twelve"} // unnestColumn_flat
+ }
+ };
+
+ RecordBatch.IterOutcome[] iterOutcomes =
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
+ @Ignore("RecordBatchSizer throws Exception in RecordBatchSizer.expandMap")
+ @Test
+ public void testUnnestMapColumn() {
+
+ Object[][] data = getMapData();
+
+ // Create input schema
+ TupleMetadata incomingSchema = getRepeatedMapSchema();
+ TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+ Object[][][] baseline = getMapBaseline();
+
+ RecordBatch.IterOutcome[] iterOutcomes =
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testUnnestEmptyList() {
+
+ Object[][] data = {
+ { (Object) new String[] {},
+ (Object) new String[] {}
+ },
+ { (Object) new String[] {},
+ (Object) new String[] {}
+ }
+ };
+
+ // Create input schema
+ TupleMetadata incomingSchema = new SchemaBuilder()
+ .add("someColumn", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+ TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+ // All batches are empty
+ String[][][] baseline = {{{}}};
+
+ RecordBatch.IterOutcome[] iterOutcomes =
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testUnnestMultipleNewSchemaIncoming() {
+
+ // Schema changes in incoming have no effect on unnest unless the type of
the
+ // unnest column itself has changed
+ Object[][] data = {
+ {
+ (Object) new String[] {"0", "1"},
+ (Object) new String[] {"2", "3", "4"}
+ },
+ {
+ (Object) new String[] {"5", "6" },
+ },
+ {
+ (Object) new String[] {"9"}
+ }
+ };
+
+ // Create input schema
+ TupleMetadata incomingSchema = new SchemaBuilder()
+ .add("someColumn", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+ TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema,
incomingSchema};
+
+ Object[][][] baseline = {
+ {
+ {1, 1, 2, 2, 2, 3, 3, 4},
+ {"0", "1", "2", "3", "4", "5", "6", "9"}
+ }
+ };
+
+ RecordBatch.IterOutcome[] iterOutcomes = {
+ RecordBatch.IterOutcome.OK_NEW_SCHEMA,
+ RecordBatch.IterOutcome.OK,
+ RecordBatch.IterOutcome.OK_NEW_SCHEMA};
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testUnnestSchemaChange() {
+ Object[][] data = {
+ {
+ (Object) new String[] {"0", "1"},
+ (Object) new String[] {"2", "3", "4"}
+ },
+ {
+ (Object) new String[] {"5", "6" },
+ },
+ {
+ (Object) new int[] {9}
+ }
+ };
+
+ // Create input schema
+ TupleMetadata incomingSchema1 = new SchemaBuilder()
+ .add("someColumn", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+ TupleMetadata incomingSchema2 = new SchemaBuilder()
+ .add("someColumn", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+ TupleMetadata[] incomingSchemas = {incomingSchema1, incomingSchema1,
incomingSchema2};
+
+ Object[][][] baseline = {
+ {
+ {1, 1, 2, 2, 2, 3, 3},
+ {"0", "1", "2", "3", "4", "5", "6"}
+ },
+ {
+ {4},
+ {9}
+ }
+ };
+
+ RecordBatch.IterOutcome[] iterOutcomes = {
+ RecordBatch.IterOutcome.OK_NEW_SCHEMA,
+ RecordBatch.IterOutcome.OK,
+ RecordBatch.IterOutcome.OK_NEW_SCHEMA};
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
+ @Ignore ("Batch limits need to be sync'd with tthe record batch sizer. Fix
once the calulations are stabilized")
+ @Test
+ public void testUnnestLimitBatchSize() {
+
+ final int limitedOutputBatchSize = 1024;
+ final int inputBatchSize = 1024+1;
+ final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
+
// int + size of int * num entries in
+
// array)
+ // single record batch with single row. The unnest column has one
+ // more record than the batch size we want in the output
+ Object[][] data = new Object[1][1];
+
+ for (int i = 0; i < data.length; i++) {
+ for (int j = 0; j < data[i].length; j++) {
+ data[i][j] = new int[inputBatchSize];
+ for (int k =0; k < inputBatchSize; k++) {
+ ((int[])data[i][j])[k] = k;
+ }
+ }
+ }
+ Integer[][][] baseline = new Integer[2][2][];
+ baseline[0][0] = new Integer[limitedOutputBatchSize];
+ baseline[0][1] = new Integer[limitedOutputBatchSize];
+ baseline[1][0] = new Integer[1];
+ baseline[1][1] = new Integer[1];
+ for (int i = 0; i < limitedOutputBatchSize; i++) {
+ baseline[0][0][i] = 1;
+ baseline[0][1][i] = i;
+ }
+ baseline[1][0][0] = 1; // row Num
+ baseline[1][1][0] = limitedOutputBatchSize; // value
+
+ // Create input schema
+ TupleMetadata incomingSchema = new SchemaBuilder()
+ .add("rowNumber", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+ TupleMetadata[] incomingSchemas = {incomingSchema};
+
+ RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
+
+ final long outputBatchSize =
fixture.getFragmentContext().getOptions().getOption(ExecConstants
+ .OUTPUT_BATCH_SIZE_VALIDATOR);
+
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
limitedOutputBatchSizeBytes);
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ } finally {
+
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
outputBatchSize);
+ }
+
+ }
+
+ @Test
+ // Limit sends a kill. Unnest has more than one record batch for a record
when
+ // the kill is sent.
+ public void testUnnestKillFromLimitSubquery1() {
+
+ // similar to previous test; we split a record across more than one batch.
+ // but we also set a limit less than the size of the batch so only one
batch gets output.
+ final int limitedOutputBatchSize = 1024;
+ final int inputBatchSize = 1024+1;
+ final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
+
// int + size of int * num entries in
+
// array)
+ // single record batch with single row. The unnest column has one
+ // more record than the batch size we want in the output
+ Object[][] data = new Object[1][1];
+
+ for (int i = 0; i < data.length; i++) {
+ for (int j = 0; j < data[i].length; j++) {
+ data[i][j] = new int[inputBatchSize];
+ for (int k =0; k < inputBatchSize; k++) {
+ ((int[])data[i][j])[k] = k;
+ }
+ }
+ }
+
+ // because of kill we only get one batch back
+ Integer[][][] baseline = new Integer[1][2][];
+ baseline[0][0] = new Integer[limitedOutputBatchSize];
+ baseline[0][1] = new Integer[limitedOutputBatchSize];
+ for (int i = 0; i < limitedOutputBatchSize; i++) {
+ baseline[0][0][i] = 1;
+ baseline[0][1][i] = i;
+ }
+
+ // Create input schema
+ TupleMetadata incomingSchema = new SchemaBuilder()
+ .add("rowNumber", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+ TupleMetadata[] incomingSchemas = {incomingSchema};
+
+ RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
+
+ final long outputBatchSize =
fixture.getFragmentContext().getOptions().getOption(ExecConstants
+ .OUTPUT_BATCH_SIZE_VALIDATOR);
+
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
limitedOutputBatchSizeBytes);
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); //
Limit of 100 values for unnest.
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ } finally {
+
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
outputBatchSize);
+ }
+
+ }
+
+ @Test
+ // Limit sends a kill. Unnest has exactly one record batch for a record when
+ // the kill is sent. This test is actually useless since it tests the
behaviour of
+ // lateral which doesn't send kill at all if it gets an EMIT. We expect limit
+ // to do so, so let's keep the test to demonstrate the expected behaviour.
+ public void testUnnestKillFromLimitSubquery2() {
+
+ // similar to previous test but the size of the array fits exactly into
the record batch;
+
+ final int limitedOutputBatchSize = 1024;
+ final int inputBatchSize = 1024;
+ final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
+
// int + size of int * num entries in
+
// array)
+ // single record batch with single row. The unnest column has one
+ // more record than the batch size we want in the output
+ Object[][] data = new Object[1][1];
+
+ for (int i = 0; i < data.length; i++) {
+ for (int j = 0; j < data[i].length; j++) {
+ data[i][j] = new int[inputBatchSize];
+ for (int k =0; k < inputBatchSize; k++) {
+ ((int[])data[i][j])[k] = k;
+ }
+ }
+ }
+
+ // because of kill we only get one batch back
+ Integer[][][] baseline = new Integer[1][2][];
+ baseline[0][0] = new Integer[limitedOutputBatchSize];
+ baseline[0][1] = new Integer[limitedOutputBatchSize];
+ for (int i = 0; i < limitedOutputBatchSize; i++) {
+ baseline[0][0][i] = 1;
+ baseline[0][1][i] = i;
+ }
+
+ // Create input schema
+ TupleMetadata incomingSchema = new SchemaBuilder()
+ .add("rowNumber", TypeProtos.MinorType.INT)
+ .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+ TupleMetadata[] incomingSchemas = {incomingSchema};
+
+ RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
+
+ final long outputBatchSize =
fixture.getFragmentContext().getOptions().getOption(ExecConstants
+ .OUTPUT_BATCH_SIZE_VALIDATOR);
+
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
limitedOutputBatchSizeBytes);
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); //
Limit of 100 values for unnest.
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ } finally {
+
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
outputBatchSize);
+ }
+
+ }
+
+
+ @Test
+ public void testUnnestNonArrayColumn() {
+
+ Object[][] data = {
+ { (Object) new Integer (1),
+ (Object) new Integer (3)},
+ { (Object) new Integer (6),
+ (Object) new Integer (10)}
+ };
+
+ // Create input schema
+ TupleMetadata incomingSchema =
+ new SchemaBuilder()
+ .add("rowNumber", TypeProtos.MinorType.INT)
+ .add("unnestColumn", TypeProtos.MinorType.INT)
+ .buildSchema();
+ TupleMetadata[] incomingSchemas = { incomingSchema, incomingSchema };
+
+ // We expect an Exception
+ Integer[][][] baseline = {};
+
+ RecordBatch.IterOutcome[] iterOutcomes =
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+ try {
+ testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+ } catch (UserException|UnsupportedOperationException e) {
+ return; // succeeded
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
+
+ // test unnest for various input conditions without invoking kill
+ private <T> void testUnnest(
+ TupleMetadata[] incomingSchemas,
+ RecordBatch.IterOutcome[] iterOutcomes,
+ T[][] data,
+ T[][][] baseline ) throws Exception{
+ testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline);
+ }
+
+ // test unnest for various input conditions optionally invoking kill. if the
kill or killBatch
+ // parameter is greater than 0 then the record batch is sent a kill after
that many batches have been processed
+ private <T> void testUnnest( TupleMetadata[] incomingSchemas,
+ RecordBatch.IterOutcome[] iterOutcomes,
+ int unnestLimit, // kill unnest after every 'unnestLimit' number of
values in every record
+ int execKill, // number of batches after which to kill the execution (!)
+ T[][] data,
+ T[][][] baseline) throws Exception {
+
+ // Get the incoming container with dummy data for LJ
+ final List<VectorContainer> incomingContainer = new
ArrayList<>(data.length);
+
+ // Create data
+ ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
+ int rowNumber = 0;
+ int batchNum = 0;
+ for ( Object[] recordBatch : data) {
+ RowSetBuilder rowSetBuilder =
fixture.rowSetBuilder(incomingSchemas[batchNum]);
+ for ( Object rowData : recordBatch) {
+ rowSetBuilder.addRow(++rowNumber, rowData);
+ }
+ RowSet.SingleRowSet rowSet = rowSetBuilder.build();
+ rowSets.add(rowSet);
+ incomingContainer.add(rowSet.container());
+ batchNum++;
+ }
+
+ // Get the unnest POPConfig
+ final UnnestPOP unnestPopConfig = new UnnestPOP(null,
SchemaPath.getCompoundPath("unnestColumn"));
+
+ // Get the IterOutcomes for LJ
+ final List<RecordBatch.IterOutcome> outcomes = new
ArrayList<>(iterOutcomes.length);
+ for(RecordBatch.IterOutcome o : iterOutcomes) {
+ outcomes.add(o);
+ }
+
+ // Create incoming MockRecordBatch
+ final MockRecordBatch incomingMockBatch =
+ new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
incomingContainer, outcomes,
+ incomingContainer.get(0).getSchema());
+
+ // setup Unnest record batch
+ final UnnestRecordBatch unnestBatch =
+ new UnnestRecordBatch(unnestPopConfig, fixture.getFragmentContext());
+
+ final LateralJoinBatch lateralJoinBatch =
+ new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
incomingMockBatch, unnestBatch);
+
+ // set pointer to Lateral in unnest
+ unnestBatch.setIncoming((LateralContract) lateralJoinBatch);
+
+ // Simulate the pipeline by calling next on the incoming
+
+ // results is an array ot batches, each batch being an array of output
vectors.
+ List<List<ValueVector> > resultList = new ArrayList<>();
+ List<List<ValueVector> > results = null;
+ int batchesProcessed = 0;
+ try{
+ try {
+ while (!isTerminal(lateralJoinBatch.next())) {
+ if (lateralJoinBatch.getRecordCount() > 0) {
+ addBatchToResults(resultList, lateralJoinBatch);
+ }
+ batchesProcessed++;
+ if (batchesProcessed == execKill) {
+ lateralJoinBatch.getContext().getExecutorState().fail(new
DrillException("Testing failure of execution."));
+ lateralJoinBatch.kill(true);
+ }
+ // else nothing to do
+ }
+ } catch (UserException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new Exception ("Test failed to execute lateralJoinBatch.next()
because: " + e.getMessage());
+ }
+
+ // Check results against baseline
+ results = resultList;
+
+ int batchIndex = 0;
+ int vectorIndex = 0;
+ //int valueIndex = 0;
+ for ( List<ValueVector> batch: results) {
+ int vectorCount= batch.size();
+ if (vectorCount!= baseline[batchIndex].length+1) { // baseline does
not include the original unnest column
+ fail("Test failed in validating unnest output. Batch column count
mismatch.");
+ }
+ for (ValueVector vv : batch) {
+ if(vv.getField().getName().equals("unnestColumn")) {
+ continue; // skip the original input column
+ }
+ int valueCount = vv.getAccessor().getValueCount();
+ if (valueCount!= baseline[batchIndex][vectorIndex].length) {
+ fail("Test failed in validating unnest output. Value count
mismatch in batch number " + (batchIndex+1) +""
+ + ".");
+ }
+
+ for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+ if (vv instanceof MapVector) {
+ if
(!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
+ .getAccessor()
+ .getObject(valueIndex))) {
+ fail("Test failed in validating unnest(Map) output. Value
mismatch");
+ }
+ } else if (vv instanceof VarCharVector) {
+ Object val = vv.getAccessor().getObject(valueIndex);
+ if (((String)
baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
+ fail("Test failed in validating unnest output. Value mismatch.
Baseline value[]" + vectorIndex + "][" + valueIndex
+ + "]" + ": " + baseline[vectorIndex][valueIndex] + "
VV.getObject(valueIndex): " + val);
+ }
+ } else {
+ Object val = vv.getAccessor().getObject(valueIndex);
+ if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
+ fail("Test failed in validating unnest output. Value mismatch.
Baseline value[" + vectorIndex + "][" + valueIndex
+ + "]" + ": "
+ +
((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "
VV.getObject(valueIndex): " + val);
+ }
+ }
+ }
+ vectorIndex++;
+ }
+ vectorIndex=0;
+ batchIndex++;
+ }
+ } catch (UserException e) {
+ throw e; // Valid exception
+ } catch (Exception e) {
+ fail("Test failed. Exception : " + e.getMessage());
+ } finally {
+ // Close all the resources for this test case
+ unnestBatch.close();
+ lateralJoinBatch.close();
+ incomingMockBatch.close();
+
+ if (results != null) {
+ for (List<ValueVector> batch : results) {
+ for (ValueVector vv : batch) {
+ vv.clear();
+ }
+ }
+ }
+ for(RowSet.SingleRowSet rowSet: rowSets) {
+ rowSet.clear();
+ }
+ }
+
+ }
+
+ /**
+ * Build a schema with a repeated map -
+ *
+ * {
+ * rowNum,
+ * mapColumn : [
+ * {
+ * colA,
+ * colB : [
+ * varcharCol
+ * ]
+ * }
+ * ]
+ * }
+ *
+ * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar
schema and data
+ * @return TupleMetadata corresponding to the schema
+ */
+ private TupleMetadata getRepeatedMapSchema() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("rowNum", TypeProtos.MinorType.INT)
+ .addMapArray("unnestColumn")
+ .add("colA", TypeProtos.MinorType.INT)
+ .addArray("colB", TypeProtos.MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ return schema;
+ }
+
+ private Object[][] getMapData( ) {
+
+ Object[][] d = {
+ {
+ new Object[] {},
+ new Object[] {
+ new Object[] {11, new String[] {"1.1.1", "1.1.2" }},
+ new Object[] {12, new String[] {"1.2.1", "1.2.2" }}
+ },
+
+ new Object[] {
+ new Object[] {21, new String[] {"2.1.1", "2.1.2" }},
+ new Object[] {22, new String[] {}},
+ new Object[] {23, new String[] {"2.3.1", "2.3.2" }}
+ }
+ },
+ {
+ new Object[] {
+ new Object[] {31, new String[] {"3.1.1", "3.1.2" }},
+ new Object[] {32, new String[] {"3.2.1", "3.2.2" }}
+ }
+ }
+ };
+
+ return d;
+ }
+
+ private Object[][][] getMapBaseline() {
+
+ Object[][][] d = {
+ {
+ {2,2,3,3,3,4,4},
+ {
+ "{\"colA\":11,\"colB\":[\"1.1.1\",\"1.1.2\"]}",
+ "{\"colA\":12,\"colB\":[\"1.2.1\",\"1.2.2\"]}",
+ "{\"colA\":21,\"colB\":[\"2.1.1\",\"2.1.2\"]}",
+ "{\"colA\":22,\"colB\":[]}",
+ "{\"colA\":23,\"colB\":[\"2.3.1\",\"2.3.2\"]}",
+ "{\"colA\":31,\"colB\":[\"3.1.1\",\"3.1.2\"]}",
+ "{\"colA\":32,\"colB\":[\"3.2.1\",\"3.2.2\"]}"
+ }
+ }
+ };
+ return d;
+ }
+
+ private boolean compareMapBaseline(Object baselineValue, Object vector) {
+ String vv = vector.toString();
+ String b = (String)baselineValue;
+ return vv.equalsIgnoreCase(b);
+ }
+
+ private int addBatchToResults(List<List<ValueVector> > resultList,
RecordBatch inputBatch) {
+ int count = 0;
+ final RecordBatchData batchCopy = new RecordBatchData(inputBatch,
operatorContext.getAllocator());
+ boolean success = false;
+ try {
+ count = batchCopy.getRecordCount();
+ resultList.add(batchCopy.getVectors());
+ success = true;
+ } finally {
+ if (!success) {
+ batchCopy.clear();
+ }
+ }
+ return count;
+ }
+
+ private boolean isTerminal(RecordBatch.IterOutcome outcome) {
+ return (outcome == RecordBatch.IterOutcome.NONE || outcome ==
RecordBatch.IterOutcome.STOP) || (outcome
+ == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+ }
+
+}
+