This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 0bdebf27944396d69fa4926d1bf2da5899e03033 Author: Sorabh Hamirwasia <[email protected]> AuthorDate: Thu Jun 21 22:57:00 2018 -0700 DRILL-6535: ClassCastException in Lateral Unnest queries when dealing with schema changed json data Note: The issue was happening because for a left incoming all right batches were filtered and hence outputIndex was still 0 when new left incoming came with OK_NEW_SCHEMA. The OK_NEW_SCHEMA change was consumed without updating output container schema. This closes #1339 --- .../exec/physical/impl/join/LateralJoinBatch.java | 37 ++++++++++-- .../impl/join/TestLateralJoinCorrectness.java | 67 ++++++++++++++++++++++ 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 578cbc8..84dc5c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -433,6 +433,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> rightUpstream = next(RIGHT_INDEX, right); switch (rightUpstream) { case OK_NEW_SCHEMA: + + // If there is some records in the output batch that means left batch didn't came with OK_NEW_SCHEMA, + // otherwise it would have been marked for processInFuture and output will be returned. This means for + // current non processed left or new left non-empty batch there is unexpected right batch schema change + if (outputIndex > 0) { + throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of " + + "current left batch or a new non-empty left batch with no schema change"); + } // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through // @@ -548,6 +556,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Get both left batch and the right batch and make sure indexes are properly set leftUpstream = processLeftBatch(); + // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome if (processLeftBatchInFuture) { logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " + "batch and process the new batch in subsequent next call", leftUpstream); @@ -564,7 +573,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // If we have received the left batch with EMIT outcome and is empty then we should return previous output // batch with EMIT outcome - if (leftUpstream == EMIT && left.getRecordCount() == 0) { + if ((leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) && left.getRecordCount() == 0) { isLeftProcessed = true; break; } @@ -579,10 +588,16 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // left in outgoing batch so let's get next right batch. // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing // batch. Now we have got new left batch with OK outcome. Let's get next right batch - // - // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome + // 3) OR previous left & right batch was fully processed and left came with OK outcome. Outgoing batch is + // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with + // OK_NEW_SCHEMA. rightUpstream = processRightBatch(); - Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch"); + if (rightUpstream == OK_NEW_SCHEMA) { + leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; + rightUpstream = OK; + finalizeOutputContainer(); + return OK_NEW_SCHEMA; + } if (isTerminalOutcome(rightUpstream)) { finalizeOutputContainer(); @@ -591,6 +606,17 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Update the batch memory manager to use new right incoming batch updateMemoryManager(RIGHT_INDEX); + + // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in + // output container based on new left schema and old right schema. If schema change failed then return STOP + // downstream + if (leftUpstream == OK_NEW_SCHEMA && isLeftProcessed) { + if (!handleSchemaChange()) { + return STOP; + } + // Since schema has change so we have new empty vectors in output container hence allocateMemory for them + allocateVectors(); + } } } // output batch is full to its max capacity @@ -735,6 +761,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); colSize.allocateVector(w.getValueVector(), maxOutputRowCount); } + + logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", container.getAllocator().getAllocatedMemory(), + container.getAllocator().getPeakMemoryAllocation()); } private boolean setBatchState(IterOutcome outcome) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java index caa8137..2723e30 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java @@ -2803,4 +2803,71 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { leftOutcomes2.clear(); } } + + /** + * Test to verify that for first left incoming if there is no right side incoming batch and then second left + * incoming comes with schema change, then the schema change with empty output batch for first incoming is handled + * properly. + * @throws Exception + */ + @Test + public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Exception { + // Create left input schema 2 + TupleMetadata leftSchema2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.VARCHAR) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + + // Create data for left input + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2) + .addRow(2, "20", "item20") + .build(); + + // Get the left container with dummy data for Lateral Join + leftContainer.add(nonEmptyLeftRowSet.container()); + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + // first OK_NEW_SCHEMA batch + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); // non-empty OK_NEW_SCHEMA batch + rightContainer.add(emptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + // This means 2 output record batches were received because of Schema change + assertEquals(3, ljBatch.getRecordCount()); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + leftRowSet2.clear(); + } + } }
