ilooner closed pull request #1418: DRILL-6654: Data verification failure with
lateral unnest query havin…
URL: https://github.com/apache/drill/pull/1418
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 916585088f1..f30616bacd4 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -136,12 +136,14 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
outcome = out;
return AggOutcome.RETURN_OUTCOME;
case EMIT:
+ outerOutcome = EMIT;
if (incoming.getRecordCount() == 0) {
// When we see an EMIT we let the agg record batch know
that it should either
// send out an EMIT or an OK_NEW_SCHEMA, followed by an
EMIT. To do that we simply return
// RETURN_AND_RESET with the outcome so the record batch
can take care of it.
return setOkAndReturnEmit();
} else {
+ currentIndex = this.getVectorIndex(underlyingIndex);
break outer;
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 2183efa1db5..cead984cc5c 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -24,6 +24,7 @@
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.DirectRowSet;
@@ -1164,4 +1165,140 @@ public void t20_testStreamingAggrWithEmptyDataSet() {
assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
}
+ /**
+ Repeats t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2 with no group by
+ */
+ @Test
+ public void t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2() {
+ TupleMetadata inputSchema_sv2 = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.INT)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .withSVMode(BatchSchema.SelectionVectorMode.TWO_BYTE)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyRowSet_Sv2 =
operatorFixture.rowSetBuilder(inputSchema_sv2)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema_sv2)
+ .addSelection(false, 2, 20, "item2")
+ .addSelection(true, 3, 30, "item3")
+ .withSv2()
+ .build();
+
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, inputContainerSv2,
inputContainer.get(0).getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ final RowSet.SingleRowSet expectedRowSet =
operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)33)
+ .build();
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // For special batch.
+ assertEquals(1, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet =
DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ nonEmptyInputRowSet2.clear();
+ emptyRowSet_Sv2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ Repeats t22_testStreamingAggrRunsOfEmpty_NonEmpty with no group by
+ */
+ @Test
+ public void t22_testStreamingAggrRunsOfEmpty_NonEmpty() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final RowSet.SingleRowSet expectedRowSet =
operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .build();
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, inputContainer.get(0).getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet =
DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services