[ https://issues.apache.org/jira/browse/FLINK-10261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601802#comment-16601802 ]
ASF GitHub Bot commented on FLINK-10261: ---------------------------------------- xueyumusic opened a new pull request #6648: [FLINK-10261][table] fix insert into with order by URL: https://github.com/apache/flink/pull/6648 ## What is the purpose of the change This PR tried to fix insert into with order by. It was caused by calcite validate rewrite and change SqlNode in some cases such as ORDER_BY sql kind. In these special cases validate return new SqlNode with different kind, however, flink does not use this new SqlNode. In addition, `DataStreamSortRule` seems require the first sort attributes is time attributes and order is ascending ## Brief change log TableEnvironment validate ## Verifying this change test case ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > INSERT INTO does not work with ORDER BY clause > ---------------------------------------------- > > Key: FLINK-10261 > URL: https://issues.apache.org/jira/browse/FLINK-10261 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Timo Walther > Assignee: xueyu > Priority: Major > Labels: pull-request-available > > It seems that INSERT INTO and ORDER BY do not work well together. > An AssertionError is thrown and the ORDER BY clause is duplicated. I guess > this is a Calcite issue. > Example: > {code} > @Test > def testInsertIntoMemoryTable(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > MemoryTableSourceSinkUtil.clear() > val t = StreamTestData.getSmall3TupleDataStream(env) > .assignAscendingTimestamps(x => x._2) > .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) > tEnv.registerTable("sourceTable", t) > val fieldNames = Array("d", "e", "f", "t") > val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, > Types.SQL_TIMESTAMP) > .asInstanceOf[Array[TypeInformation[_]]] > val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink > tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) > val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM > sourceTable ORDER BY a" > tEnv.sqlUpdate(sql) > env.execute() > {code} > Error: > {code} > java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, > `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime` > FROM `sourceTable` AS `sourceTable` > ORDER BY `a` > ORDER BY `a` > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104) > at > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717) > at > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683) > at > org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)