http://git-wip-us.apache.org/repos/asf/drill/blob/8d562d07/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
new file mode 100644
index 0000000..4c6b4c0
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.fail;
+
+@Category(OperatorTest.class)
+public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
+
+
+  // Operator Context for mock batch
+  public static OperatorContext operatorContext;
+
+  public static PhysicalOperator mockPopConfig;
+  public static LateralJoinPOP ljPopConfig;
+
+
+  @BeforeClass public static void setUpBeforeClass() throws Exception {
+    mockPopConfig = new MockStorePOP(null);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    operatorContext = fixture.newOperatorContext(mockPopConfig);
+  }
+
+  @AfterClass public static void tearDownAfterClass() throws Exception {
+    operatorContext.close();
+  }
+
+  @Test
+  public void testUnnestFixedWidthColumn() {
+
+    Object[][] data = {
+        { (Object) new int[] {1, 2},
+          (Object) new int[] {3, 4, 5}},
+        { (Object) new int[] {6, 7, 8, 9},
+          (Object) new int[] {10, 11, 12, 13, 14}}
+    };
+
+    // Create input schema
+    TupleMetadata incomingSchema =
+        new SchemaBuilder()
+            .add("rowNumber", TypeProtos.MinorType.INT)
+            .addArray("unnestColumn", TypeProtos.MinorType.INT)
+            .buildSchema();
+    TupleMetadata[] incomingSchemas = { incomingSchema, incomingSchema };
+
+    Integer[][][] baseline = {
+        {
+          {1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4}, //rowNum
+          {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14} // unnestColumn_flat
+        }
+    };
+
+    RecordBatch.IterOutcome[] iterOutcomes = 
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void testUnnestVarWidthColumn() {
+
+    Object[][] data = {
+        { (Object) new String[] {"", "zero"},
+          (Object) new String[] {"one", "two", "three"}},
+        { (Object) new String[] {"four", "five", "six", "seven"},
+          (Object) new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
+    };
+
+    // Create input schema
+    TupleMetadata incomingSchema = new SchemaBuilder()
+        .add("someColumn", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+    TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+    Object[][][] baseline = {
+      {
+        {1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4}, // rowNum
+        {"", "zero", "one", "two", "three", "four", "five", "six", "seven", 
"eight", "nine", "ten", "eleven",
+            "twelve"} // unnestColumn_flat
+      }
+    };
+
+    RecordBatch.IterOutcome[] iterOutcomes = 
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
+  @Ignore("RecordBatchSizer throws Exception in RecordBatchSizer.expandMap")
+  @Test
+  public void testUnnestMapColumn() {
+
+    Object[][] data = getMapData();
+
+    // Create input schema
+    TupleMetadata incomingSchema = getRepeatedMapSchema();
+    TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+    Object[][][] baseline = getMapBaseline();
+
+    RecordBatch.IterOutcome[] iterOutcomes = 
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void testUnnestEmptyList() {
+
+    Object[][] data = {
+        { (Object) new String[] {},
+          (Object) new String[] {}
+        },
+        { (Object) new String[] {},
+          (Object) new String[] {}
+        }
+    };
+
+    // Create input schema
+    TupleMetadata incomingSchema = new SchemaBuilder()
+        .add("someColumn", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+    TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+    // All batches are empty
+    String[][][] baseline = {{{}}};
+
+    RecordBatch.IterOutcome[] iterOutcomes = 
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void testUnnestMultipleNewSchemaIncoming() {
+
+    // Schema changes in incoming have no effect on unnest unless the type of 
the
+    // unnest column itself has changed
+    Object[][] data = {
+        {
+            (Object) new String[] {"0", "1"},
+            (Object) new String[] {"2", "3", "4"}
+        },
+        {
+            (Object) new String[] {"5", "6" },
+        },
+        {
+            (Object) new String[] {"9"}
+        }
+    };
+
+    // Create input schema
+    TupleMetadata incomingSchema = new SchemaBuilder()
+        .add("someColumn", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+    TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema, 
incomingSchema};
+
+    Object[][][] baseline = {
+        {
+          {1, 1, 2, 2, 2, 3, 3, 4},
+          {"0", "1", "2", "3", "4", "5", "6", "9"}
+        }
+    };
+
+    RecordBatch.IterOutcome[] iterOutcomes = {
+        RecordBatch.IterOutcome.OK_NEW_SCHEMA,
+        RecordBatch.IterOutcome.OK,
+        RecordBatch.IterOutcome.OK_NEW_SCHEMA};
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void testUnnestSchemaChange() {
+    Object[][] data = {
+        {
+            (Object) new String[] {"0", "1"},
+            (Object) new String[] {"2", "3", "4"}
+        },
+        {
+            (Object) new String[] {"5", "6" },
+        },
+        {
+            (Object) new int[] {9}
+        }
+    };
+
+    // Create input schema
+    TupleMetadata incomingSchema1 = new SchemaBuilder()
+        .add("someColumn", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
+    TupleMetadata incomingSchema2 = new SchemaBuilder()
+        .add("someColumn", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+    TupleMetadata[] incomingSchemas = {incomingSchema1, incomingSchema1, 
incomingSchema2};
+
+    Object[][][] baseline = {
+        {
+          {1, 1, 2, 2, 2, 3, 3},
+          {"0", "1", "2", "3", "4", "5", "6"}
+        },
+        {
+          {4},
+          {9}
+        }
+    };
+
+    RecordBatch.IterOutcome[] iterOutcomes = {
+        RecordBatch.IterOutcome.OK_NEW_SCHEMA,
+        RecordBatch.IterOutcome.OK,
+        RecordBatch.IterOutcome.OK_NEW_SCHEMA};
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
+  @Ignore ("Batch limits need to be sync'd with tthe record batch sizer. Fix 
once the calulations are stabilized")
+  @Test
+  public void testUnnestLimitBatchSize() {
+
+    final int limitedOutputBatchSize = 1024;
+    final int inputBatchSize = 1024+1;
+    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
+                                                                               
// int + size of int * num entries in
+                                                                               
// array)
+    // single record batch with single row. The unnest column has one
+    // more record than the batch size we want in the output
+    Object[][] data = new Object[1][1];
+
+    for (int i = 0; i < data.length; i++) {
+      for (int j = 0; j < data[i].length; j++) {
+        data[i][j] = new int[inputBatchSize];
+        for (int k =0; k < inputBatchSize; k++) {
+          ((int[])data[i][j])[k] = k;
+        }
+      }
+    }
+    Integer[][][] baseline = new Integer[2][2][];
+    baseline[0][0] = new Integer[limitedOutputBatchSize];
+    baseline[0][1] = new Integer[limitedOutputBatchSize];
+    baseline[1][0] = new Integer[1];
+    baseline[1][1] = new Integer[1];
+    for (int i = 0; i < limitedOutputBatchSize; i++) {
+      baseline[0][0][i] = 1;
+      baseline[0][1][i] = i;
+    }
+    baseline[1][0][0] = 1; // row Num
+    baseline[1][1][0] = limitedOutputBatchSize; // value
+
+    // Create input schema
+    TupleMetadata incomingSchema = new SchemaBuilder()
+        .add("rowNumber", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+    TupleMetadata[] incomingSchemas = {incomingSchema};
+
+    RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
+
+    final long outputBatchSize = 
fixture.getFragmentContext().getOptions().getOption(ExecConstants
+        .OUTPUT_BATCH_SIZE_VALIDATOR);
+    
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
 limitedOutputBatchSizeBytes);
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    } finally {
+      
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
 outputBatchSize);
+    }
+
+  }
+
+  @Test
+  // Limit sends a kill. Unnest has more than one record batch for a record 
when
+  // the kill is sent.
+  public void testUnnestKillFromLimitSubquery1() {
+
+    // similar to previous test; we split a record across more than one batch.
+    // but we also set a limit less than the size of the batch so only one 
batch gets output.
+    final int limitedOutputBatchSize = 1024;
+    final int inputBatchSize = 1024+1;
+    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
+                                                                               
// int + size of int * num entries in
+                                                                               
// array)
+    // single record batch with single row. The unnest column has one
+    // more record than the batch size we want in the output
+    Object[][] data = new Object[1][1];
+
+    for (int i = 0; i < data.length; i++) {
+      for (int j = 0; j < data[i].length; j++) {
+        data[i][j] = new int[inputBatchSize];
+        for (int k =0; k < inputBatchSize; k++) {
+          ((int[])data[i][j])[k] = k;
+        }
+      }
+    }
+
+    // because of kill we only get one batch back
+    Integer[][][] baseline = new Integer[1][2][];
+    baseline[0][0] = new Integer[limitedOutputBatchSize];
+    baseline[0][1] = new Integer[limitedOutputBatchSize];
+    for (int i = 0; i < limitedOutputBatchSize; i++) {
+      baseline[0][0][i] = 1;
+      baseline[0][1][i] = i;
+    }
+
+    // Create input schema
+    TupleMetadata incomingSchema = new SchemaBuilder()
+        .add("rowNumber", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+    TupleMetadata[] incomingSchemas = {incomingSchema};
+
+    RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
+
+    final long outputBatchSize = 
fixture.getFragmentContext().getOptions().getOption(ExecConstants
+        .OUTPUT_BATCH_SIZE_VALIDATOR);
+    
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
 limitedOutputBatchSizeBytes);
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // 
Limit of 100 values for unnest.
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    } finally {
+      
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
 outputBatchSize);
+    }
+
+  }
+
+  @Test
+  // Limit sends a kill. Unnest has exactly one record batch for a record when
+  // the kill is sent. This test is actually useless since it tests the 
behaviour of
+  // lateral which doesn't send kill at all if it gets an EMIT. We expect limit
+  // to do so, so let's keep the test to demonstrate the expected behaviour.
+  public void testUnnestKillFromLimitSubquery2() {
+
+    // similar to previous test but the size of the array fits exactly into 
the record batch;
+
+    final int limitedOutputBatchSize = 1024;
+    final int inputBatchSize = 1024;
+    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
+                                                                               
// int + size of int * num entries in
+                                                                               
// array)
+    // single record batch with single row. The unnest column has one
+    // more record than the batch size we want in the output
+    Object[][] data = new Object[1][1];
+
+    for (int i = 0; i < data.length; i++) {
+      for (int j = 0; j < data[i].length; j++) {
+        data[i][j] = new int[inputBatchSize];
+        for (int k =0; k < inputBatchSize; k++) {
+          ((int[])data[i][j])[k] = k;
+        }
+      }
+    }
+
+    // because of kill we only get one batch back
+    Integer[][][] baseline = new Integer[1][2][];
+    baseline[0][0] = new Integer[limitedOutputBatchSize];
+    baseline[0][1] = new Integer[limitedOutputBatchSize];
+    for (int i = 0; i < limitedOutputBatchSize; i++) {
+      baseline[0][0][i] = 1;
+      baseline[0][1][i] = i;
+    }
+
+    // Create input schema
+    TupleMetadata incomingSchema = new SchemaBuilder()
+        .add("rowNumber", TypeProtos.MinorType.INT)
+        .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+
+    TupleMetadata[] incomingSchemas = {incomingSchema};
+
+    RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
+
+    final long outputBatchSize = 
fixture.getFragmentContext().getOptions().getOption(ExecConstants
+        .OUTPUT_BATCH_SIZE_VALIDATOR);
+    
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
 limitedOutputBatchSizeBytes);
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // 
Limit of 100 values for unnest.
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    } finally {
+      
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE,
 outputBatchSize);
+    }
+
+  }
+
+
+  @Test
+  public void testUnnestNonArrayColumn() {
+
+    Object[][] data = {
+        { (Object) new Integer (1),
+            (Object) new Integer (3)},
+        { (Object) new Integer (6),
+            (Object) new Integer (10)}
+    };
+
+    // Create input schema
+    TupleMetadata incomingSchema =
+        new SchemaBuilder()
+            .add("rowNumber", TypeProtos.MinorType.INT)
+            .add("unnestColumn", TypeProtos.MinorType.INT)
+            .buildSchema();
+    TupleMetadata[] incomingSchemas = { incomingSchema, incomingSchema };
+
+    // We expect an Exception
+    Integer[][][] baseline = {};
+
+    RecordBatch.IterOutcome[] iterOutcomes = 
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+    try {
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+    } catch (UserException|UnsupportedOperationException e) {
+      return; // succeeded
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
+
+  // test unnest for various input conditions without invoking kill
+  private <T> void testUnnest(
+      TupleMetadata[] incomingSchemas,
+      RecordBatch.IterOutcome[] iterOutcomes,
+      T[][] data,
+      T[][][] baseline ) throws Exception{
+    testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline);
+  }
+
+  // test unnest for various input conditions optionally invoking kill. if the 
kill or killBatch
+  // parameter is greater than 0 then the record batch is sent a kill after 
that many batches have been processed
+  private <T> void testUnnest( TupleMetadata[] incomingSchemas,
+      RecordBatch.IterOutcome[] iterOutcomes,
+      int unnestLimit, // kill unnest after every 'unnestLimit' number of 
values in every record
+      int execKill, // number of batches after which to kill the execution (!)
+      T[][] data,
+      T[][][] baseline) throws Exception {
+
+    // Get the incoming container with dummy data for LJ
+    final List<VectorContainer> incomingContainer = new 
ArrayList<>(data.length);
+
+    // Create data
+    ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
+    int rowNumber = 0;
+    int batchNum = 0;
+    for ( Object[] recordBatch : data) {
+      RowSetBuilder rowSetBuilder = 
fixture.rowSetBuilder(incomingSchemas[batchNum]);
+      for ( Object rowData : recordBatch) {
+        rowSetBuilder.addRow(++rowNumber, rowData);
+      }
+      RowSet.SingleRowSet rowSet = rowSetBuilder.build();
+      rowSets.add(rowSet);
+      incomingContainer.add(rowSet.container());
+      batchNum++;
+    }
+
+    // Get the unnest POPConfig
+    final UnnestPOP unnestPopConfig = new UnnestPOP(null, 
SchemaPath.getCompoundPath("unnestColumn"));
+
+    // Get the IterOutcomes for LJ
+    final List<RecordBatch.IterOutcome> outcomes = new 
ArrayList<>(iterOutcomes.length);
+    for(RecordBatch.IterOutcome o : iterOutcomes) {
+      outcomes.add(o);
+    }
+
+    // Create incoming MockRecordBatch
+    final MockRecordBatch incomingMockBatch =
+        new MockRecordBatch(fixture.getFragmentContext(), operatorContext, 
incomingContainer, outcomes,
+            incomingContainer.get(0).getSchema());
+
+    // setup Unnest record batch
+    final UnnestRecordBatch unnestBatch =
+        new UnnestRecordBatch(unnestPopConfig, fixture.getFragmentContext());
+
+    final LateralJoinBatch lateralJoinBatch =
+        new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), 
incomingMockBatch, unnestBatch);
+
+    // set pointer to Lateral in unnest
+    unnestBatch.setIncoming((LateralContract) lateralJoinBatch);
+
+    // Simulate the pipeline by calling next on the incoming
+
+    // results is an array ot batches, each batch being an array of output 
vectors.
+    List<List<ValueVector> > resultList = new ArrayList<>();
+    List<List<ValueVector> > results = null;
+    int batchesProcessed = 0;
+    try{
+    try {
+      while (!isTerminal(lateralJoinBatch.next())) {
+        if (lateralJoinBatch.getRecordCount() > 0) {
+          addBatchToResults(resultList, lateralJoinBatch);
+        }
+        batchesProcessed++;
+        if (batchesProcessed == execKill) {
+          lateralJoinBatch.getContext().getExecutorState().fail(new 
DrillException("Testing failure of execution."));
+          lateralJoinBatch.kill(true);
+        }
+        // else nothing to do
+      }
+    } catch (UserException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new Exception ("Test failed to execute lateralJoinBatch.next() 
because: " + e.getMessage());
+    }
+
+      // Check results against baseline
+      results = resultList;
+
+      int batchIndex = 0;
+      int vectorIndex = 0;
+      //int valueIndex = 0;
+      for ( List<ValueVector> batch: results) {
+        int vectorCount= batch.size();
+        if (vectorCount!= baseline[batchIndex].length+1) { // baseline does 
not include the original unnest column
+          fail("Test failed in validating unnest output. Batch column count 
mismatch.");
+        }
+        for (ValueVector vv : batch) {
+          if(vv.getField().getName().equals("unnestColumn")) {
+            continue; // skip the original input column
+          }
+          int valueCount = vv.getAccessor().getValueCount();
+          if (valueCount!= baseline[batchIndex][vectorIndex].length) {
+            fail("Test failed in validating unnest output. Value count 
mismatch in batch number " + (batchIndex+1) +""
+                + ".");
+          }
+
+          for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+            if (vv instanceof MapVector) {
+              if 
(!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
+                  .getAccessor()
+                  .getObject(valueIndex))) {
+                fail("Test failed in validating unnest(Map) output. Value 
mismatch");
+              }
+            } else if (vv instanceof VarCharVector) {
+              Object val = vv.getAccessor().getObject(valueIndex);
+              if (((String) 
baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
+                fail("Test failed in validating unnest output. Value mismatch. 
Baseline value[]" + vectorIndex + "][" + valueIndex
+                    + "]" + ": " + baseline[vectorIndex][valueIndex] + "   
VV.getObject(valueIndex): " + val);
+              }
+            } else {
+              Object val = vv.getAccessor().getObject(valueIndex);
+              if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
+                fail("Test failed in validating unnest output. Value mismatch. 
Baseline value[" + vectorIndex + "][" + valueIndex
+                    + "]" + ": "
+                    + 
((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "   
VV.getObject(valueIndex): " + val);
+              }
+            }
+          }
+          vectorIndex++;
+        }
+        vectorIndex=0;
+        batchIndex++;
+      }
+    } catch (UserException e) {
+      throw e; // Valid exception
+    } catch (Exception e) {
+      fail("Test failed. Exception : " + e.getMessage());
+    } finally {
+      // Close all the resources for this test case
+      unnestBatch.close();
+      lateralJoinBatch.close();
+      incomingMockBatch.close();
+
+      if (results != null) {
+        for (List<ValueVector> batch : results) {
+          for (ValueVector vv : batch) {
+            vv.clear();
+          }
+        }
+      }
+      for(RowSet.SingleRowSet rowSet: rowSets) {
+        rowSet.clear();
+      }
+    }
+
+  }
+
+  /**
+   * Build a schema with a repeated map -
+   *
+   *  {
+   *    rowNum,
+   *    mapColumn : [
+   *       {
+   *         colA,
+   *         colB : [
+   *            varcharCol
+   *         ]
+   *       }
+   *    ]
+   *  }
+   *
+   * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar 
schema and data
+   * @return TupleMetadata corresponding to the schema
+   */
+  private TupleMetadata getRepeatedMapSchema() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("rowNum", TypeProtos.MinorType.INT)
+        .addMapArray("unnestColumn")
+        .add("colA", TypeProtos.MinorType.INT)
+        .addArray("colB", TypeProtos.MinorType.VARCHAR)
+        .resumeSchema()
+        .buildSchema();
+    return schema;
+  }
+
+  private Object[][] getMapData( ) {
+
+    Object[][] d = {
+      {
+          new Object[] {},
+          new Object[] {
+              new Object[] {11, new String[] {"1.1.1", "1.1.2" }},
+              new Object[] {12, new String[] {"1.2.1", "1.2.2" }}
+          },
+
+          new Object[] {
+              new Object[] {21, new String[] {"2.1.1", "2.1.2" }},
+              new Object[] {22, new String[] {}},
+              new Object[] {23, new String[] {"2.3.1", "2.3.2" }}
+          }
+      },
+      {
+        new Object[] {
+            new Object[] {31, new String[] {"3.1.1", "3.1.2" }},
+            new Object[] {32, new String[] {"3.2.1", "3.2.2" }}
+        }
+      }
+    };
+
+    return d;
+  }
+
+  private Object[][][] getMapBaseline() {
+
+    Object[][][] d = {
+      {
+          {2,2,3,3,3,4,4},
+          {
+            "{\"colA\":11,\"colB\":[\"1.1.1\",\"1.1.2\"]}",
+              "{\"colA\":12,\"colB\":[\"1.2.1\",\"1.2.2\"]}",
+              "{\"colA\":21,\"colB\":[\"2.1.1\",\"2.1.2\"]}",
+              "{\"colA\":22,\"colB\":[]}",
+              "{\"colA\":23,\"colB\":[\"2.3.1\",\"2.3.2\"]}",
+              "{\"colA\":31,\"colB\":[\"3.1.1\",\"3.1.2\"]}",
+              "{\"colA\":32,\"colB\":[\"3.2.1\",\"3.2.2\"]}"
+          }
+      }
+    };
+    return d;
+  }
+
+  private boolean compareMapBaseline(Object baselineValue, Object vector) {
+    String vv = vector.toString();
+    String b = (String)baselineValue;
+    return vv.equalsIgnoreCase(b);
+  }
+
+  private int addBatchToResults(List<List<ValueVector> > resultList, 
RecordBatch inputBatch) {
+    int count = 0;
+    final RecordBatchData batchCopy = new RecordBatchData(inputBatch, 
operatorContext.getAllocator());
+    boolean success = false;
+    try {
+      count = batchCopy.getRecordCount();
+      resultList.add(batchCopy.getVectors());
+      success = true;
+    } finally {
+      if (!success) {
+        batchCopy.clear();
+      }
+    }
+    return count;
+  }
+
+  private boolean isTerminal(RecordBatch.IterOutcome outcome) {
+    return (outcome == RecordBatch.IterOutcome.NONE || outcome == 
RecordBatch.IterOutcome.STOP) || (outcome
+        == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+  }
+
+}
+

Reply via email to