[ https://issues.apache.org/jira/browse/DRILL-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569271#comment-16569271 ]
ASF GitHub Bot commented on DRILL-6653: --------------------------------------- sohami closed pull request #1422: DRILL-6653: Unsupported Schema change exception where there is no sch… URL: https://github.com/apache/drill/pull/1422 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 05a1f1267de..acfdc878aa6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -51,7 +51,11 @@ public int getRecordCount() { @Override protected boolean setupNewSchema() throws SchemaChangeException { - container.clear(); + // Don't clear off container just because an OK_NEW_SCHEMA was received from upstream. For cases when there is just + // change in container type but no actual schema change, RemovingRecordBatch should consume OK_NEW_SCHEMA and + // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container + // change in incoming container type is not actual schema change. + container.zeroVectors(); switch(incoming.getSchema().getSelectionVectorMode()){ case NONE: this.copier = getStraightCopier(); @@ -66,6 +70,8 @@ protected boolean setupNewSchema() throws SchemaChangeException { throw new UnsupportedOperationException(); } + // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA + // downstream too if (container.isSchemaChanged()) { container.buildSchema(SelectionVectorMode.NONE); return true; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java new file mode 100644 index 00000000000..6613a71ca24 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemoverIterOutcome.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.config.SelectionVectorRemover; +import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome; +import org.apache.drill.exec.physical.impl.MockRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.DirectRowSet; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.After; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestSVRemoverIterOutcome extends BaseTestOpBatchEmitOutcome { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSVRemoverIterOutcome.class); + + // Holds reference to actual operator instance created for each tests + private static RemovingRecordBatch removingRecordBatch; + + // Lits of expected outcomes populated by each tests. Used to verify actual IterOutcome returned with next call on + // operator to expected outcome + private final List<RecordBatch.IterOutcome> expectedOutcomes = new ArrayList<>(); + + // List of expected row counts populated by each tests. Used to verify actual output row count to expected row count + private final List<Integer> expectedRecordCounts = new ArrayList<>(); + + // List of expected row sets populated by each tests. Used to verify actual output from operator to expected output + private final List<RowSet> expectedRowSets = new ArrayList<>(); + + /** + * Cleanup method executed post each test + */ + @After + public void afterTestCleanup() { + // close removing recordbatch + removingRecordBatch.close(); + + // Release memory from expectedRowSets + for (RowSet expectedRowSet : expectedRowSets) { + expectedRowSet.clear(); + } + expectedOutcomes.clear(); + expectedRecordCounts.clear(); + expectedRowSets.clear(); + } + + private void testSVRemoverCommon() { + final SelectionVectorRemover svRemover = new SelectionVectorRemover(null); + final MockRecordBatch batch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainer, + inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema()); + + removingRecordBatch = new RemovingRecordBatch(svRemover, fragContext, batch); + + int i=0; + int expectedRowSetIndex = 0; + while (i < expectedOutcomes.size()) { + try { + assertEquals(expectedOutcomes.get(i), removingRecordBatch.next()); + assertEquals(removingRecordBatch.getRecordCount(), (int)expectedRecordCounts.get(i++)); + + if (removingRecordBatch.getRecordCount() > 0) { + final RowSet actualRowSet = DirectRowSet.fromContainer(removingRecordBatch.getContainer()); + new RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet); + } + } finally { + removingRecordBatch.getContainer().zeroVectors(); + } + } + } + + @Test + public void test_SimpleContainer_NoSchemaChange() { + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Expected row sets + final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + + expectedRowSets.add(expectedRowSet); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet.rowCount()); + + testSVRemoverCommon(); + } + + @Test + public void test_SimpleContainer_SchemaChange() { + inputContainer.add(emptyInputRowSet.container()); + + TupleMetadata inputSchema2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.VARCHAR) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + RowSet nonEmpty2 = operatorFixture.rowSetBuilder(inputSchema2) + .addRow(1, "10", "item1") + .addRow(2, "20", "item2") + .build(); + + inputContainer.add(nonEmpty2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Expected row sets + final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema2) + .addRow(1, "10", "item1") + .addRow(2, "20", "item2") + .build(); + + expectedRowSets.add(expectedRowSet); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet.rowCount()); + + testSVRemoverCommon(); + } + + @Test + public void test_SV2Container_NoSchemaChange() { + final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema) + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addSelection(false, 1, 100, "item100") + .addSelection(true, 1, 101, "item101") + .addSelection(false, 1, 102, "item102") + .addSelection(true, 1, 103, "item103") + .addSelection(false, 2, 200, "item200") + .addSelection(true, 2, 201, "item201") + .addSelection(true, 2, 202, "item202") + .withSv2() + .build(); + + inputContainer.add(emptyWithSv2.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputContainerSv2.add(emptyWithSv2.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet2.getSv2()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Expected row sets + final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 101, "item101") + .addRow(1, 103, "item103") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + expectedRowSets.add(expectedRowSet); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet.rowCount()); + + testSVRemoverCommon(); + } + + @Test + public void test_SV2Container_SchemaChange() { + // Batch for schema 1 + final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema) + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addSelection(false, 1, 100, "item100") + .addSelection(true, 1, 101, "item101") + .addSelection(false, 1, 102, "item102") + .addSelection(true, 1, 103, "item103") + .addSelection(false, 2, 200, "item200") + .addSelection(true, 2, 201, "item201") + .addSelection(true, 2, 202, "item202") + .withSv2() + .build(); + + // Batch for schema 2 + TupleMetadata inputSchema2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.VARCHAR) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .buildSchema(); + + final RowSet.SingleRowSet empty2WithSv2 = operatorFixture.rowSetBuilder(inputSchema2) + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmpty2InputRowSet2 = operatorFixture.rowSetBuilder(inputSchema2) + .addSelection(true, 1, "101", "item101") + .addSelection(false, 1, "102", "item102") + .addSelection(true, 1, "103", "item103") + .addSelection(false, 2, "200", "item200") + .addSelection(true, 2, "201", "item201") + .withSv2() + .build(); + + inputContainer.add(emptyWithSv2.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(empty2WithSv2.container()); + inputContainer.add(nonEmpty2InputRowSet2.container()); + + inputContainerSv2.add(emptyWithSv2.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet2.getSv2()); + inputContainerSv2.add(empty2WithSv2.getSv2()); + inputContainerSv2.add(nonEmpty2InputRowSet2.getSv2()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Expected row sets + final RowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 101, "item101") + .addRow(1, 103, "item103") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema2) + .addRow(1, "101", "item101") + .addRow(1, "103", "item103") + .addRow(2, "201", "item201") + .build(); + + expectedRowSets.add(expectedRowSet); + expectedRowSets.add(expectedRowSet1); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet.rowCount()); + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet1.rowCount()); + + testSVRemoverCommon(); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unsupported Schema change exception where there is no schema change in > lateral Unnest queries > --------------------------------------------------------------------------------------------- > > Key: DRILL-6653 > URL: https://issues.apache.org/jira/browse/DRILL-6653 > Project: Apache Drill > Issue Type: Bug > Affects Versions: 1.14.0 > Reporter: Kedar Sankar Behera > Assignee: Sorabh Hamirwasia > Priority: Major > Labels: ready-to-commit > Fix For: 1.15.0 > > Attachments: Plan2.pdf > > > Unsupported Schema change exception where there is no schema change > DataSet - A single json file(sf1) > Query - > {code} > select customer.c_custkey, customer.c_name, sum(orders.totalprice) totalprice > from customer, lateral (select t.o.o_totalprice as totalprice from > unnest(customer.c_orders) t(o) order by totalprice limit 10) orders group by > customer.c_custkey, customer.c_name order by customer.c_custkey limit 50; > {code} > Result - > {code} > Exception: > java.sql.SQLException: UNSUPPORTED_OPERATION ERROR: Hash aggregate does not > support schema change > Prior schema : > BatchSchema [fields=[[`c_custkey` (VARCHAR:OPTIONAL)], [`c_name` > (VARCHAR:OPTIONAL)], [`totalprice` (FLOAT8:OPTIONAL)]], selectionVector=NONE] > New schema : > BatchSchema [fields=[[`c_custkey` (VARCHAR:OPTIONAL)], [`c_name` > (VARCHAR:OPTIONAL)], [`totalprice` (FLOAT8:OPTIONAL)]], selectionVector=NONE] > Fragment 0:0 > [Error Id: 21d4d646-4e6a-4e4a-ba75-60ba247ddabd on drill191:31010] > at > org.apache.drill.jdbc.impl.DrillCursor.nextRowInternally(DrillCursor.java:528) > at org.apache.drill.jdbc.impl.DrillCursor.next(DrillCursor.java:632) > at > oadd.org.apache.calcite.avatica.AvaticaResultSet.next(AvaticaResultSet.java:207) > at > org.apache.drill.jdbc.impl.DrillResultSetImpl.next(DrillResultSetImpl.java:153) > at > org.apache.drill.test.framework.DrillTestJdbc.executeQuery(DrillTestJdbc.java:253) > at org.apache.drill.test.framework.DrillTestJdbc.run(DrillTestJdbc.java:115) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: oadd.org.apache.drill.common.exceptions.UserRemoteException: > UNSUPPORTED_OPERATION ERROR: Hash aggregate does not support schema change > Prior schema : > BatchSchema [fields=[[`c_custkey` (VARCHAR:OPTIONAL)], [`c_name` > (VARCHAR:OPTIONAL)], [`totalprice` (FLOAT8:OPTIONAL)]], selectionVector=NONE] > New schema : > BatchSchema [fields=[[`c_custkey` (VARCHAR:OPTIONAL)], [`c_name` > (VARCHAR:OPTIONAL)], [`totalprice` (FLOAT8:OPTIONAL)]], selectionVector=NONE] > Fragment 0:0 > [Error Id: 21d4d646-4e6a-4e4a-ba75-60ba247ddabd on drill191:31010] > at > oadd.org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(QueryResultHandler.java:123) > at oadd.org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422) > at oadd.org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96) > at > oadd.org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:274) > at > oadd.org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:244) > at > oadd.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) > at > oadd.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) > at > oadd.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) > at > oadd.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312) > at > oadd.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) > at > oadd.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) > at > oadd.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) > at > oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) > at > oadd.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) > at > oadd.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > oadd.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > oadd.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > oadd.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at oadd.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > oadd.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > ... 1 more > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)