[jira] [Created] (FLINK-20706) Separate the implementation of BatchExecUnion and StreamExecUnion
godfrey he created FLINK-20706: -- Summary: Separate the implementation of BatchExecUnion and StreamExecUnion Key: FLINK-20706 URL: https://issues.apache.org/jira/browse/FLINK-20706 Project: Flink Issue Type: Sub-task Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20705) Separate the implementation of BatchExecValues and StreamExecValues
godfrey he created FLINK-20705: -- Summary: Separate the implementation of BatchExecValues and StreamExecValues Key: FLINK-20705 URL: https://issues.apache.org/jira/browse/FLINK-20705 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20693) Port BatchExecPythonCorrelate and StreamExecPythonCorrelate to Java
godfrey he created FLINK-20693: -- Summary: Port BatchExecPythonCorrelate and StreamExecPythonCorrelate to Java Key: FLINK-20693 URL: https://issues.apache.org/jira/browse/FLINK-20693 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20690) Separate the implementation of BatchExecCorrelate and StreamExecCorrelate
godfrey he created FLINK-20690: -- Summary: Separate the implementation of BatchExecCorrelate and StreamExecCorrelate Key: FLINK-20690 URL: https://issues.apache.org/jira/browse/FLINK-20690 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20673) ExecNode#getOutputType method should return LogicalType instead of RowType
godfrey he created FLINK-20673: -- Summary: ExecNode#getOutputType method should return LogicalType instead of RowType Key: FLINK-20673 URL: https://issues.apache.org/jira/browse/FLINK-20673 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Currently, {{ExecNode#getOutputType}} always returns {{RowType}}. But some nodes do not return {{RowData}}, such as {{BatchExecSink}} and {{StreamExecSink}}. The output type of {{ExecNode}} should be consistent with the type parameter {{T}}. So {{ExecNode#getOutputType}} should return {{LogicalType}} instead of {{RowType}}, each subclass cast the {{LogicalType}} to specific type when using it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20668) Introduce translateToExecNode method for FlinkPhysicalRel
godfrey he created FLINK-20668: -- Summary: Introduce translateToExecNode method for FlinkPhysicalRel Key: FLINK-20668 URL: https://issues.apache.org/jira/browse/FLINK-20668 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Currently, we introduce ExecGraphGenerator to translate a graph of FlinkPhysicalRel to a ExecNode graph. While as more and more ExecNode supported, ExecGraphGenerator is hard to maintain. So we introduce FlinkPhysicalRel#translateToExecNode to create specific ExecNode in each physical RelNode, and ExecGraphGenerator is used to build the whole graph based on the translated ExecNode, including connecting the input/output nodes, handling the rel visitor, handle some special nodes (such as, Exchange), etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20623) Separate the implementation of StreamExecWatermarkAssigner
godfrey he created FLINK-20623: -- Summary: Separate the implementation of StreamExecWatermarkAssigner Key: FLINK-20623 URL: https://issues.apache.org/jira/browse/FLINK-20623 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20622) Separate the implementation of StreamExecChangelogNormalize
godfrey he created FLINK-20622: -- Summary: Separate the implementation of StreamExecChangelogNormalize Key: FLINK-20622 URL: https://issues.apache.org/jira/browse/FLINK-20622 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20620) Port StreamExecPythonCalc to java
godfrey he created FLINK-20620: -- Summary: Port StreamExecPythonCalc to java Key: FLINK-20620 URL: https://issues.apache.org/jira/browse/FLINK-20620 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20610) Separate the implementation of BatchExecCalc and StreamExecCalc
godfrey he created FLINK-20610: -- Summary: Separate the implementation of BatchExecCalc and StreamExecCalc Key: FLINK-20610 URL: https://issues.apache.org/jira/browse/FLINK-20610 Project: Flink Issue Type: Sub-task Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20609) Separate the implementation of BatchExecDataStreamScan and StreamExecDataStreamScan
godfrey he created FLINK-20609: -- Summary: Separate the implementation of BatchExecDataStreamScan and StreamExecDataStreamScan Key: FLINK-20609 URL: https://issues.apache.org/jira/browse/FLINK-20609 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20608) Separate the implementation of BatchExecLegacyTableSourceScan and StreamExecLegacyTableSourceScan
godfrey he created FLINK-20608: -- Summary: Separate the implementation of BatchExecLegacyTableSourceScan and StreamExecLegacyTableSourceScan Key: FLINK-20608 URL: https://issues.apache.org/jira/browse/FLINK-20608 Project: Flink Issue Type: Sub-task Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20516) Separate the implementation of BatchExecTableSourceScan and StreamExecTableSourceScan
godfrey he created FLINK-20516: -- Summary: Separate the implementation of BatchExecTableSourceScan and StreamExecTableSourceScan Key: FLINK-20516 URL: https://issues.apache.org/jira/browse/FLINK-20516 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20515) Separate the implementation of BatchExecMultipleInput and StreamExecMultipleInput
godfrey he created FLINK-20515: -- Summary: Separate the implementation of BatchExecMultipleInput and StreamExecMultipleInput Key: FLINK-20515 URL: https://issues.apache.org/jira/browse/FLINK-20515 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20513) Separate the implementation of BatchExecExchange and StreamExecExchange
godfrey he created FLINK-20513: -- Summary: Separate the implementation of BatchExecExchange and StreamExecExchange Key: FLINK-20513 URL: https://issues.apache.org/jira/browse/FLINK-20513 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he The issue will separate the implementation of Batch(/Stream)ExecExchange, we will introduce Batch(/Stream)PhysicalExchange which will extend {{FlinkPhysicalRel}}, {{BatchExecExchange }} will be moved into `nodes.exec.batch` package and will implement ExecNode for Exchange, {{StreamExecExchange}} will be moved into `nodes.exec.stream` package. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20512) Introduce getDescription, getOutputType, replaceInputEdge methods for ExecNode
godfrey he created FLINK-20512: -- Summary: Introduce getDescription, getOutputType, replaceInputEdge methods for ExecNode Key: FLINK-20512 URL: https://issues.apache.org/jira/browse/FLINK-20512 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he We will introduce {{getDescription()}} method which returns the description information of the ExecNode, introduce {{getOutputType()}} method which returns the output {{RowType}} of the ExecNode, introduce {{replaceInputEdge(int ordinalInParent, ExecEdge newInputEdge)}} method which can update the input edge at the given index. Note: {{RelNode}} also has {{getDescription()}} method, so before we finish all separation work, we will introduce {{getDesc()}} instead of {{getDescription()}} to avoid compile error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20511) Deprecate the classes in scala `nodes.exec` package
godfrey he created FLINK-20511: -- Summary: Deprecate the classes in scala `nodes.exec` package Key: FLINK-20511 URL: https://issues.apache.org/jira/browse/FLINK-20511 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he As we will separate the implementation of {{RelNode}} and {{ExecNode}}, the new classes introduce for {{ExecNode}} implementation will be Java. So the classes in `nodes.exec` package will be deprecated, we will rename {{ExecNodeBase}} to {{LegaceExecNodeBase}}, rename {{BatchExecNode}} to {{LegacyBatchExecNode}}, and rename {{StreamExecNode}} to {{LegacyStreamExecNode}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20509) Refactor verifyPlan method in TableTestBase
godfrey he created FLINK-20509: -- Summary: Refactor verifyPlan method in TableTestBase Key: FLINK-20509 URL: https://issues.apache.org/jira/browse/FLINK-20509 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Currently, we use {{verifyPlan}} method to verify the plan result for both {{RelNode}} plan and {{ExecNode}} plan, because their instances are the same. But once the implementation of {{RelNode}} and {{ExecNode}} are separated we can't get {{ESTIMATED_COST}} and {{CHANGELOG_MODE}} on {{ExecNode}} plan. So in order to make the methods more clear, the {{verifyPlan}} method can be separated into two methods, {{verifyRelPlan}} for verifying the {{RelNode}} plan, and {{verifyExecPlan}} for verifying the {{ExecNode}} plan. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20490) Don't share inputs between FlinkPhysicalRel and ExecNode
godfrey he created FLINK-20490: -- Summary: Don't share inputs between FlinkPhysicalRel and ExecNode Key: FLINK-20490 URL: https://issues.apache.org/jira/browse/FLINK-20490 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Currently, all execution nodes extend both from {{RelNode}} and {{ExecNode}}, and they share same input instances which is stored in the {{RelNode}} instance. {{ExecNode#getInputNodes}} just returns the casted {{RelNode}} inputs, code likes: {code:java} getInputs.map(_.asInstanceOf[ExecNode[_]]) {code} This issue aim to let {{ExecNode}} have its own input instance. It is prepared for the implementation separation between {{RelNode}} and {{ExecNode}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20478) Show "Optimized Execution Plan" instead of "Physical Execution Plan" in explain result
godfrey he created FLINK-20478: -- Summary: Show "Optimized Execution Plan" instead of "Physical Execution Plan" in explain result Key: FLINK-20478 URL: https://issues.apache.org/jira/browse/FLINK-20478 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Currently, the explain result includes "Abstract Syntax Tree", "Optimized Logical Plan" and "Physical Execution Plan". While the "Optimized Logical Plan" is an {{ExecNode}} graph, and the "[ExplainDetail|https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java]"; represents the expected explain details, including {{ESTIMATED_COST}} and {{CHANGELOG_MODE}} now. Those types can only used for Calicte {{RelNode}}s instead of {{ExecNode}}. So I suggest to make the following adjustments: 1. Keep "Abstract Syntax Tree" as it, which represents the original (un-optimized) {{RelNode}} graph converted from {{SqlNode}}. 2. Rename "Optimized Logical Plan" to "Optimized Physical Plan", which represents the optimized physical {{RelNode}} graph composed of {{FlinkPhysicalRel}}. {{ESTIMATED_COST}} and {{CHANGELOG_MODE}} describe the expected explain details for "Optimized Physical Plan". 3.Replace "Physical Execution Plan" with "Optimized Execution Plan", which represents the optimized {{ExecNode}} graph. Currently, many optimizations are based on {{ExecNode}} graph, such as sub-plan reuse, multiple input rewrite. We may introduce more optimizations in the future. So there are more and more difference between "Optimized Physical Plan" and "Optimized Execution Plan". We do not want to show tow execution plans, and "Physical Execution Plan" for {{StreamGraph}} is less important than "Optimized Execution Plan". If we want to introduce "Physical Execution Plan" in the future, we can add a type named "PHYSICAL_EXECUTION_PLAN" in {{ExplainDetail}} to support it. There is already an issue to do the similar things, [FLINK-19687|https://issues.apache.org/jira/browse/FLINK-19687] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20437) Port ExecNode to Java
godfrey he created FLINK-20437: -- Summary: Port ExecNode to Java Key: FLINK-20437 URL: https://issues.apache.org/jira/browse/FLINK-20437 Project: Flink Issue Type: Sub-task Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20436) Simplify type parameter of ExecNod
godfrey he created FLINK-20436: -- Summary: Simplify type parameter of ExecNod Key: FLINK-20436 URL: https://issues.apache.org/jira/browse/FLINK-20436 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20435) Refactor ExecNode
godfrey he created FLINK-20435: -- Summary: Refactor ExecNode Key: FLINK-20435 URL: https://issues.apache.org/jira/browse/FLINK-20435 Project: Flink Issue Type: Improvement Reporter: godfrey he Currently, there are many improvements about ExecNode: 1. simplify type parameter of {{ExecNode}}. Currently, {{ExecNode#translateToPlan}} takes {{BatchPlanner}} or {{StreamPlanner}} as a parameter, so {{ExecNode}} has a type parameter {{E <: Planner}}, which indicates the node is a batch node or a streaming node. While in the future, a plan may contain both batch nodes and stream node. The type parameter can be removed, the implementation base class can cast the planner to expected planner. 2. port ExecNode to Java 3. separate the implementation of {{RelNode}} and {{ExecNode}}. Currently, an execution node extends both from {{RelNode}} and {{ExecNode}}. After a physical node is converted to an exec node, many parameters are unnecessary, such as: RelOptCluster, RelTraitSet, etc. With more optimizations on {{ExecNode}}, We need {{ExecNode}} to be cleaner and simpler. So we will separate the implementation of {{RelNode}} and {{ExecNode}}. This is an umbrella issue, we will create more related sub-tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20322) Rename table.optimizer.union-all-as-breakpoint-disabled to table.optimizer.union-all-as-breakpoint.enabled
godfrey he created FLINK-20322: -- Summary: Rename table.optimizer.union-all-as-breakpoint-disabled to table.optimizer.union-all-as-breakpoint.enabled Key: FLINK-20322 URL: https://issues.apache.org/jira/browse/FLINK-20322 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.0 Reporter: godfrey he Fix For: 1.12.0 {{table.optimizer.union-all-as-breakpoint-disabled}} is defined in {{RelNodeBlockPlanBuilder}} and is an internal experimental config. While {{disabled}} and {{false}} as default value is very obscure. I suggest to change {{table.optimizer.union-all-as-breakpoint-disabled}} to {{table.optimizer.union-all-as-breakpoint-enabled}} and use {{true}} as default value, which is easier to understand. As this config is an internal experimental config, we don't mark it as deprecated. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20293) Remove the useless constructor of ContinuousFileMonitoringFunction
godfrey he created FLINK-20293: -- Summary: Remove the useless constructor of ContinuousFileMonitoringFunction Key: FLINK-20293 URL: https://issues.apache.org/jira/browse/FLINK-20293 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.12.0 Reporter: godfrey he Fix For: 1.12.0 In 1.11, we introduce {{ContinuousFileMonitoringFunction}} with given {{globalModificationTime}} to support {{HiveTableSource}} continuously reading non-partitioned file. While this feature has a bug, see [FLINK-20277|https://issues.apache.org/jira/browse/FLINK-20277]. In master, after [FLINK-19888|https://issues.apache.org/jira/browse/FLINK-19888] finished, {{HiveTableSource}} does not depend on {{ContinuousFileMonitoringFunction}} any more, we can revert the changes of [FLINK-17435|https://issues.apache.org/jira/browse/FLINK-17435] about the ContinuousFileMonitoringFunction part to avoid such bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20255) Nested decorrelate failed
godfrey he created FLINK-20255: -- Summary: Nested decorrelate failed Key: FLINK-20255 URL: https://issues.apache.org/jira/browse/FLINK-20255 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.0, 1.12.0 Reporter: godfrey he This issue is from ML https://www.mail-archive.com/user@flink.apache.org/msg37746.html We can reproduce the issue through the following code {code:java} @FunctionHint(output = new DataTypeHint("ROW")) class SplitStringToRows extends TableFunction[Row] { def eval(str: String, separator: String = ";"): Unit = { if (str != null) { str.split(separator).foreach(s => collect(Row.of(s.trim( } } } object Job { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) streamTableEnv.createTemporarySystemFunction( "SplitStringToRows", classOf[SplitStringToRows] ) // Class defined in previous email streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr1 STRING, attr2 STRING, attr3 DECIMAL, attr4 DATE ) WITH ( 'connector' = 'datagen' )""") val q2 = streamTableEnv.sqlQuery( """ SELECT a.attr1 AS attr1, attr2, attr3, attr4 FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS a(attr1) """) streamTableEnv.createTemporaryView("view2", q2) val q3 = """ SELECT w.attr1, p.attr3 FROM table2 w LEFT JOIN LATERAL ( SELECT attr1, attr3 FROM ( SELECT attr1, attr3, ROW_NUMBER() OVER ( PARTITION BY attr1 ORDER BY attr4 DESC NULLS LAST, w.attr2 = attr2 DESC NULLS LAST ) AS row_num FROM view2) WHERE row_num = 1) p ON (w.attr1 = p.attr1) """ println(streamTableEnv.explainSql(q3)) } } {code} The reason is {{RelDecorrelator}} in Calcite can't handle such nested decorrelate pattern now -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20070) NPE in SourceCoordinatorProviderTest.testCheckpointAndReset
godfrey he created FLINK-20070: -- Summary: NPE in SourceCoordinatorProviderTest.testCheckpointAndReset Key: FLINK-20070 URL: https://issues.apache.org/jira/browse/FLINK-20070 Project: Flink Issue Type: Bug Components: Connectors / Common Reporter: godfrey he Fix For: 1.12.0 https://dev.azure.com/godfreyhe/c147b7ad-1708-46c3-9021-cc523e50c4d5/_apis/build/builds/71/logs/114 {code:java} 2020-11-10T03:41:10.8231846Z [INFO] Running org.apache.flink.runtime.source.coordinator.SourceCoordinatorContextTest 2020-11-10T03:41:11.2510061Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.171 s <<< FAILURE! - in org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest 2020-11-10T03:41:11.2511837Z [ERROR] testCheckpointAndReset(org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest) Time elapsed: 1.055 s <<< ERROR! 2020-11-10T03:41:11.2512610Z java.lang.NullPointerException 2020-11-10T03:41:11.2513268Zat org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest.testCheckpointAndReset(SourceCoordinatorProviderTest.java:94) 2020-11-10T03:41:11.2513967Zat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2020-11-10T03:41:11.2514553Zat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2020-11-10T03:41:11.2515230Zat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2020-11-10T03:41:11.2515827Zat java.lang.reflect.Method.invoke(Method.java:498) 2020-11-10T03:41:11.2516428Zat org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 2020-11-10T03:41:11.2517107Zat org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2020-11-10T03:41:11.2517757Zat org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 2020-11-10T03:41:11.2518431Zat org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2020-11-10T03:41:11.2519082Zat org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 2020-11-10T03:41:11.2519677Zat org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 2020-11-10T03:41:11.2520292Zat org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 2020-11-10T03:41:11.2521100Zat org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 2020-11-10T03:41:11.2521831Zat org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 2020-11-10T03:41:11.2522420Zat org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 2020-11-10T03:41:11.2522988Zat org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 2020-11-10T03:41:11.2523582Zat org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 2020-11-10T03:41:11.2524165Zat org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 2020-11-10T03:41:11.2524951Zat org.junit.runners.ParentRunner.run(ParentRunner.java:363) 2020-11-10T03:41:11.2525570Zat org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) 2020-11-10T03:41:11.2526288Zat org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) 2020-11-10T03:41:11.2526969Zat org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) 2020-11-10T03:41:11.2527742Zat org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) 2020-11-10T03:41:11.2528467Zat org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) 2020-11-10T03:41:11.2529169Zat org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) 2020-11-10T03:41:11.2529844Zat org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) 2020-11-10T03:41:11.2530480Zat org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19925) Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
godfrey he created FLINK-19925: -- Summary: Errors$NativeIoException: readAddress(..) failed: Connection reset by peer Key: FLINK-19925 URL: https://issues.apache.org/jira/browse/FLINK-19925 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: godfrey he Errors$NativeIoException will occur sometime when we run TPCDS based on master, the full exception stack is {code:java} Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to 'xxx') at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:818) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) ~[?:1.8.0_102] Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19822) remove redundant shuffle for stream
godfrey he created FLINK-19822: -- Summary: remove redundant shuffle for stream Key: FLINK-19822 URL: https://issues.apache.org/jira/browse/FLINK-19822 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19737) Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG
godfrey he created FLINK-19737: -- Summary: Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG Key: FLINK-19737 URL: https://issues.apache.org/jira/browse/FLINK-19737 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he Fix For: 1.12.0 {{Transformation}} is not serializable, while {{StreamOperatorFactory}} is. We need to introduce another class (named {{TableOperatorWrapper}}) to store the information of a {{Transformation}}, and introduce a utility class (named {{TableOperatorWrapper}}) to convert the {{Transformation}} DAG to {{TableOperatorWrapper}} DAG. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18853) Supports properties from flink-conf.yaml for SET command in sql client
godfrey he created FLINK-18853: -- Summary: Supports properties from flink-conf.yaml for SET command in sql client Key: FLINK-18853 URL: https://issues.apache.org/jira/browse/FLINK-18853 Project: Flink Issue Type: New Feature Components: Table SQL / Client Reporter: godfrey he Many specific properties for sql client can be replace with config options from flink-conf.yaml. such as: {{execution.parallelism}} can be replaced with {{parallelism.default}}. Sql client does not support many properties from flink-conf.yaml for SET command, such as {{state.backend}}. As discussed in [FLINK-18161|https://issues.apache.org/jira/browse/FLINK-18161], we can deprecate sql client specific properties and support all properties from flink-conf.yaml. If there is a conflict between deprecate property and new property, just throws exception. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18791) Simplify the logic of SqlCommandParser and CliClient
godfrey he created FLINK-18791: -- Summary: Simplify the logic of SqlCommandParser and CliClient Key: FLINK-18791 URL: https://issues.apache.org/jira/browse/FLINK-18791 Project: Flink Issue Type: Improvement Components: Table SQL / Client Reporter: godfrey he Fix For: 1.12.0 Currently, {{SqlCommandParser}} parse a statement to a corresponding {{SqlCommandCall}}, and {{CliClient}} will do different action based on different {{SqlCommandCall}}. However, if a new kind of statement is supported (such as [SHOW CURRENT DDL|https://issues.apache.org/jira/browse/FLINK-18616], [SHOW CREATE TABLE|https://issues.apache.org/jira/browse/FLINK-16384]), we must implement them in planner and at the same time we need to add a new kind of {{SqlCommand}}, add new method in {{CliClient}} to execute the command in sql client. Moreover the implementation may be forgotten in sql client (such as [FLINK-18059|https://issues.apache.org/jira/browse/FLINK-18059]). To improve this, I propose {{SqlCommandParser}} returns {{Operation}}, and {{CliClient}} executes {{Operation}} directly. Meanwhile we can unify the print style of different kind of Operations. After this is finished, we need not to change the sql client if a new kind of statement is supported, only if customized print style is needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18789) Use TableEnvironment#executeSql to execute insert statement in sql client
godfrey he created FLINK-18789: -- Summary: Use TableEnvironment#executeSql to execute insert statement in sql client Key: FLINK-18789 URL: https://issues.apache.org/jira/browse/FLINK-18789 Project: Flink Issue Type: Sub-task Components: Table SQL / Client Reporter: godfrey he Fix For: 1.12.0 Currently, sql client has a lot of logic to execute an insert job, which can be simplified through executeSql method. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18788) Refactor sql-client based the proposed interfaces of FLIP-84
godfrey he created FLINK-18788: -- Summary: Refactor sql-client based the proposed interfaces of FLIP-84 Key: FLINK-18788 URL: https://issues.apache.org/jira/browse/FLINK-18788 Project: Flink Issue Type: Task Components: Table SQL / Client Reporter: godfrey he Fix For: 1.12.0 FLIP-84 introduces many useful methods, such as {{TableEnvironment#executeSql}}, {{TableResult#collect}}. Sql client can uses those methods to simplify many implementations, such as: many methods in {{Executor}} interface can be replaced with executeSql method, the select implementation can be replaced with collect method. This issue aims to the related refactor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18731) The monotonicity of UNIX_TIMESTAMP function is not correct
godfrey he created FLINK-18731: -- Summary: The monotonicity of UNIX_TIMESTAMP function is not correct Key: FLINK-18731 URL: https://issues.apache.org/jira/browse/FLINK-18731 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.12.0 Currently, the monotonicity of {{UNIX_TIMESTAMP}} function is always {{INCREASING}}, actually, when it has empty function arguments ({{UNIX_TIMESTAMP()}}, is equivalent to {{NOW}}), its monotonicity is INCREASING. otherwise its monotonicity should be NOT_MONOTONIC. (e.g. UNIX_TIMESTAMP(string)) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18717) reuse MiniCluster in table integration test class ?
godfrey he created FLINK-18717: -- Summary: reuse MiniCluster in table integration test class ? Key: FLINK-18717 URL: https://issues.apache.org/jira/browse/FLINK-18717 Project: Flink Issue Type: Improvement Components: Table SQL / Legacy Planner, Table SQL / Planner Affects Versions: 1.11.0 Reporter: godfrey he before 1.11, {{MiniCluster}} can be reused in each integration test class. (see TestStreamEnvironment#setAsContext) In 1.11, after we correct the execution behavior of TableEnvironment, StreamTableEnvironment and BatchTableEnvironment (see [FLINK-16363|https://issues.apache.org/jira/browse/FLINK-16363], [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126]), MiniCluster will be created for each test case even in same test class (see {{org.apache.flink.client.deployment.executors.LocalExecutor}}). It's better we can reuse {{MiniCluster}} like before. One approach is we provide a new kind of MiniCluster factory (such as: SessionMiniClusterFactory) instead of using {{PerJobMiniClusterFactory}}. WDYT ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18651) implicitly cast the time attribute to regular TIMESTAMP type in regular join
godfrey he created FLINK-18651: -- Summary: implicitly cast the time attribute to regular TIMESTAMP type in regular join Key: FLINK-18651 URL: https://issues.apache.org/jira/browse/FLINK-18651 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he Currently, regular join does not accept rowtime attribute field as input, and requires users manually cast the time attribute as regular timestamp. Because time attribute will be out-of-order after regular join, and then we can't do window aggregate based on the time attribute. We can improve it that the planner can implicitly cast the time attribute to regular TIMESTAMP type, and throws exception there is an operator (after join) depended on time attribute, like window aggregate. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18621) Simplify the methods of Executor interface in sql client
godfrey he created FLINK-18621: -- Summary: Simplify the methods of Executor interface in sql client Key: FLINK-18621 URL: https://issues.apache.org/jira/browse/FLINK-18621 Project: Flink Issue Type: Improvement Components: Table SQL / Client Reporter: godfrey he Fix For: 1.12.0 After {{TableEnvironment#executeSql}} is introduced, many methods in {{Executor}} interface can be replaced with {{TableEnvironment#executeSql}}. Those methods include: listCatalogs, listDatabases, createTable, dropTable, listTables, listFunctions, useCatalog, useDatabase, getTableSchema (use `DESCRIBE xx`) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18550) use TableResult#collect to get select result for sql client
godfrey he created FLINK-18550: -- Summary: use TableResult#collect to get select result for sql client Key: FLINK-18550 URL: https://issues.apache.org/jira/browse/FLINK-18550 Project: Flink Issue Type: Improvement Components: Table SQL / Client Reporter: godfrey he Fix For: 1.12.0 Currently, sql client has a lot of logic to handle the select result in sql client, which can be simplified through {{TableResult#collect}} method. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18399) TableResult#print can not print the result of unbounded stream query
godfrey he created FLINK-18399: -- Summary: TableResult#print can not print the result of unbounded stream query Key: FLINK-18399 URL: https://issues.apache.org/jira/browse/FLINK-18399 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.0 Reporter: godfrey he Fix For: 1.11.0 In current implementation of PrintUtils, all result will be collected to local memory to compute column width first. this can works fine with batch query and bounded stream query. but for unbounded stream query, the result will be endless, so the result will be never printed. To solve this, we can use fix-length strategy, and print a row immediately once the row is accessed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18337) Introduce TableResult#await method to wait data ready
godfrey he created FLINK-18337: -- Summary: Introduce TableResult#await method to wait data ready Key: FLINK-18337 URL: https://issues.apache.org/jira/browse/FLINK-18337 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: godfrey he Currently, {{TableEnvironment.executeSql()}} method for INSERT statement returns TableResult once the job is submitted. Users must use {{tableResult.getJobClient.get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader) .get()}} to wait the job finish. This API looks very ugly. So this issue aims to introduce {{TableResult#await}} method, the code snippet looks like: {code:java} val tEnv = ... // submit the job and wait job finish tEnv.executeSql("insert into ...").await() {code} the suggested new methods are: {code:java} /** * Wait until the data is ready. * * For select operation, this method will wait unit the first row can be accessed in local. * For insert operation, this method will wait for the job to finish, because the result contains only one row. * For other operations, this method will return immediately, because the result is ready in local. * * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted while waiting */ void await() throws InterruptedException, ExecutionException; /** * Wait until the data is ready. * * For select operation, this method will wait unit the first row can be accessed in local. * For insert operation, this method will wait for the job to finish, because the result contains only one row. * For other operations, this method will return immediately, because the result is ready in local. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted while waiting * @throws TimeoutException if the wait timed out */ void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18180) unify the logic of time attribute derivation for both batch and streaming
godfrey he created FLINK-18180: -- Summary: unify the logic of time attribute derivation for both batch and streaming Key: FLINK-18180 URL: https://issues.apache.org/jira/browse/FLINK-18180 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he Currently, the logic of time attribute derivation is different for batch and streaming. For batch table source, the rowtime type will not be generated or will be erased as regular time type if the source table has rowtime type. To handle this difference, we have to distinguish batch or streaming via {{isStreamingMode}} flag in many places, such as: {{DatabaseCalciteSchema}}, {{CatalogSchemaTable}}, {{CatalogTableSchemaResolver}}, etc. In fact, batch queries may also need rowtime type, such as supporting rowtime temporal join. So we can unify the logic of time attribute derivation from the source side, and erase the rowtime type if need in optimization phase. And then it's easier to push the unified {{TableEnvironment}} and planner forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18061) TableResult#collect should return closeable iterator to avoid resource leak
godfrey he created FLINK-18061: -- Summary: TableResult#collect should return closeable iterator to avoid resource leak Key: FLINK-18061 URL: https://issues.apache.org/jira/browse/FLINK-18061 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 as discussed in ML: http://mail-archives.apache.org/mod_mbox/flink-dev/202005.mbox/%3CCADQYLGuk%2BnnrPv-PR6Gi7D_rZqp_DhjfA%3DVtkRB-aGPxYxOQPw%40mail.gmail.com%3E, we should return a closeable iterator for TableResult#collect method *to avoid resource leak*. The suggested change is: public interface TableResult { CloseableRowIterator collect(); } public interface CloseableRowIterator extends Iterator, AutoCloseable { } This change does not break current api. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18059) Can not execute create/drop catalog statement in sql client
godfrey he created FLINK-18059: -- Summary: Can not execute create/drop catalog statement in sql client Key: FLINK-18059 URL: https://issues.apache.org/jira/browse/FLINK-18059 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.10.1 Reporter: godfrey he when executing create catalog statement (e.g. {{create CATALOG c1 with('type'='generic_in_memory'}}) in sql client, the following exception will occur: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unsupported command: CREATE CATALOG at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:355) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:213) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Similar case for {{drop catalog}}. The reason is CliClient class does not handle CREATE_CATALOG command and DROP_CATALOG command. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18055) Catalog does not exist in SQL Client
godfrey he created FLINK-18055: -- Summary: Catalog does not exist in SQL Client Key: FLINK-18055 URL: https://issues.apache.org/jira/browse/FLINK-18055 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.11.0 Reporter: godfrey he Flink SQL> show catalogs; default_catalog hive Flink SQL> use catalog hive; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [`hive`] does not exist. The reason is {{SqlCommandParser}} adds {{``}} for catalog name, which is unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17996) NEP in CatalogTableStatisticsConverter.convertToColumnStats method
godfrey he created FLINK-17996: -- Summary: NEP in CatalogTableStatisticsConverter.convertToColumnStats method Key: FLINK-17996 URL: https://issues.apache.org/jira/browse/FLINK-17996 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.11.0 Reporter: godfrey he Currently, hive catalog only supports a few kinds of statistics, otherwise return null. (see HiveStatsUtil#createTableColumnStats). If there is a decimal statistics, NEP will occur in CatalogTableStatisticsConverter.convertToColumnStats method Caused by: java.lang.NullPointerException at org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter.convertToColumnStats(CatalogTableStatisticsConverter.java:77) at org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter.convertToColumnStatsMap(CatalogTableStatisticsConverter.java:68) at org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter.convertToTableStats(CatalogTableStatisticsConverter.java:57) at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:113) at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:97) at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:74) at java.util.Optional.map(Optional.java:215) at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:72) at org.apache.calcite. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17774) Support all kinds of changes for select result
godfrey he created FLINK-17774: -- Summary: Support all kinds of changes for select result Key: FLINK-17774 URL: https://issues.apache.org/jira/browse/FLINK-17774 Project: Flink Issue Type: Sub-task Components: Table SQL / Legacy Planner, Table SQL / Planner Reporter: godfrey he Fix For: 1.11.0 [FLINK-17252|https://issues.apache.org/jira/browse/FLINK-17252] has supported select query, however only append change is supported. because [FLINK-16998|https://issues.apache.org/jira/browse/FLINK-16998] is not finished. This issue aims to support all kinds of changes based on FLINK-16998. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17753) watermark defined in ddl does not work in Table api
godfrey he created FLINK-17753: -- Summary: watermark defined in ddl does not work in Table api Key: FLINK-17753 URL: https://issues.apache.org/jira/browse/FLINK-17753 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.11.0 the following code will get {{org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.}} {code:java} @Test def testRowTimeTableSourceGroupWindow(): Unit = { val ddl = s""" |CREATE TABLE rowTimeT ( | id int, | rowtime timestamp(3), | val bigint, | name varchar(32), | watermark for rowtime as rowtime |) WITH ( | 'connector' = 'projectable-values', | 'bounded' = 'false' |) """.stripMargin util.tableEnv.executeSql(ddl) val t = util.tableEnv.from("rowTimeT") .where($"val" > 100) .window(Tumble over 10.minutes on 'rowtime as 'w) .groupBy('name, 'w) .select('name, 'w.end, 'val.avg) util.verifyPlan(t) } {code} The reason is planner does not convert {{watermarkSpecs}} in {{TableSchema}} to correct type when calling {{tableEnv.from}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17751) proctime defined in ddl can't work with over window in Table api
godfrey he created FLINK-17751: -- Summary: proctime defined in ddl can't work with over window in Table api Key: FLINK-17751 URL: https://issues.apache.org/jira/browse/FLINK-17751 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.11.0 the following test will get {{org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute.}} {code:scala} @Test def testProcTimeTableSourceOverWindow(): Unit = { val ddl = s""" |CREATE TABLE procTimeT ( | id int, | val bigint, | name varchar(32), | proctime as PROCTIME() |) WITH ( | 'connector' = 'projectable-values', | 'bounded' = 'false' |) """.stripMargin util.tableEnv.executeSql(ddl) val t = util.tableEnv.from("procTimeT") .window(Over partitionBy 'id orderBy 'proctime preceding 2.hours as 'w) .select('id, 'name, 'val.sum over 'w as 'valSum) .filter('valSum > 100) util.verifyPlan(t) } {code} The reason is: the type of proctime is {{TIMESTAMP(3) NOT null}}, while {{LegacyTypeInfoDataTypeConverter}} does not handle the mapping between {{Types.LOCAL_DATE_TIME}} and {{DataTypes.TIMESTAMP(3)}} with not null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17728) use sql parser to parse a statement in sql client
godfrey he created FLINK-17728: -- Summary: use sql parser to parse a statement in sql client Key: FLINK-17728 URL: https://issues.apache.org/jira/browse/FLINK-17728 Project: Flink Issue Type: Improvement Components: Table SQL / Client Reporter: godfrey he Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17405) add test cases for cancel job in SQL client
godfrey he created FLINK-17405: -- Summary: add test cases for cancel job in SQL client Key: FLINK-17405 URL: https://issues.apache.org/jira/browse/FLINK-17405 Project: Flink Issue Type: Improvement Components: Table SQL / Client Reporter: godfrey he Fix For: 1.11.0, 1.10.2 as discussed in [FLINK-15669| https://issues.apache.org/jira/browse/FLINK-15669], we can re-add some tests to verify cancel job logic in SQL client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17399) CsvTableSink should also extend from OverwritableTableSink
godfrey he created FLINK-17399: -- Summary: CsvTableSink should also extend from OverwritableTableSink Key: FLINK-17399 URL: https://issues.apache.org/jira/browse/FLINK-17399 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 {{CsvTableSink}} has supported {{writeMode}} which could be {{OVERWRITE}} or {{NO_OVERWRITE}}. When we execute "INSERT OVERWRITE csv_table_sink xx", planners will check whether a table sink is an {{OverwritableTableSink}}. Now {{CsvTableSink}} does not extend from {{OverwritableTableSink}}, so we can't execute above statement. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17383) flink legacy planner should not use CollectionEnvironment any more
godfrey he created FLINK-17383: -- Summary: flink legacy planner should not use CollectionEnvironment any more Key: FLINK-17383 URL: https://issues.apache.org/jira/browse/FLINK-17383 Project: Flink Issue Type: Improvement Components: Table SQL / Legacy Planner Reporter: godfrey he Fix For: 1.11.0 As discussed in https://github.com/apache/flink/pull/11794,{{CollectionEnvironment}} is not a good practice, as it is not going through all the steps that a regular user program would go. We should change the tests to use {{LocalEnvironment}}. commit " Introduce CollectionPipelineExecutor for CollectionEnvironment ([c983ac9|https://github.com/apache/flink/commit/c983ac9c49b7b58394574efdde4f39e8d33a8582])" should also be reverted at that mement. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17303) Return TableResult for Python TableEnvironment
godfrey he created FLINK-17303: -- Summary: Return TableResult for Python TableEnvironment Key: FLINK-17303 URL: https://issues.apache.org/jira/browse/FLINK-17303 Project: Flink Issue Type: Improvement Components: API / Python Reporter: godfrey he Fix For: 1.11.0 [FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] supports executing a statement and returning a {{TableResult}} object, which could get {{JobClient}} (to associates the submitted Flink job), collect the execution result, or print the execution result. In Python, we should also introduce python TableResult class to make sure consistent with Java. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17267) supports EXPLAIN statement in TableEnvironment#executeSql and Table#explain api
godfrey he created FLINK-17267: -- Summary: supports EXPLAIN statement in TableEnvironment#executeSql and Table#explain api Key: FLINK-17267 URL: https://issues.apache.org/jira/browse/FLINK-17267 Project: Flink Issue Type: Sub-task Reporter: godfrey he Fix For: 1.11.0 [FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] has introduced executeSql method in TableEnvironment, but EXPLAIN statement is not supported because [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126] is not finished. This issue aims to support EXPLAIN statement after FLINK-17126 finished, and introduce Table#explain api at the same time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17252) TableEnvironment#executeSql supports SELECT statement and Table introduces execute method
godfrey he created FLINK-17252: -- Summary: TableEnvironment#executeSql supports SELECT statement and Table introduces execute method Key: FLINK-17252 URL: https://issues.apache.org/jira/browse/FLINK-17252 Project: Flink Issue Type: Sub-task Reporter: godfrey he Fix For: 1.11.0 [FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] has introduced executeSql method in TableEnvironment, but INSERT statement is not supported because [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126] is not finished and the design of [FLINK-14807|https://issues.apache.org/jira/browse/FLINK-14807] is discussing (the whole design of Table#collect and Table#execute are similar, although Table does not introduce collect method based on the conclusion of FLIP-84). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17251) TableEnvironment#executeSql supports INSERT statement
godfrey he created FLINK-17251: -- Summary: TableEnvironment#executeSql supports INSERT statement Key: FLINK-17251 URL: https://issues.apache.org/jira/browse/FLINK-17251 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 [FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] has introduced executeSql method in TableEnvironment, but INSERT statement is not supported because [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126] is not finished. This issue aims to supports INSERT statement after FLINK-17126 finished. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17126) Correct the execution behavior of BatchTableEnvironment
godfrey he created FLINK-17126: -- Summary: Correct the execution behavior of BatchTableEnvironment Key: FLINK-17126 URL: https://issues.apache.org/jira/browse/FLINK-17126 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 This issue is similar to [FLINK-16363|https://issues.apache.org/jira/browse/FLINK-16363]. In previous versions, BatchTableEnvironment.execute() can both trigger table and DataSet programs. Since 1.11.0, table programs can only be triggered by BatchTableEnvironment.execute(). Once table program is convereted into DataSet program (through toDataSet() method), it can only be triggered by ExecutionEnvironment.execute(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17052) Introduce PlanGenerator
godfrey he created FLINK-17052: -- Summary: Introduce PlanGenerator Key: FLINK-17052 URL: https://issues.apache.org/jira/browse/FLINK-17052 Project: Flink Issue Type: Sub-task Components: Client / Job Submission Reporter: godfrey he Fix For: 1.11.0 As [FLINK-16533|https://issues.apache.org/jira/browse/FLINK-16533] discussed, We move the most part logic of {{ExecutionEnvironment#createProgramPlan}} method to {{PlanGenerator}}, which can be used by {{ExecutionEnvironment}} and flink-table-planner. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16881) use Catalog's total size info in planner
godfrey he created FLINK-16881: -- Summary: use Catalog's total size info in planner Key: FLINK-16881 URL: https://issues.apache.org/jira/browse/FLINK-16881 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: godfrey he in some case, {{Catalog}} only contains {{totalSize}} and row count is unknown. we also can use {{totalSize}} to infer row count, or even use {{totalSize}} to decide whether the join is broadcast join -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16822) The config set by SET command does not work
godfrey he created FLINK-16822: -- Summary: The config set by SET command does not work Key: FLINK-16822 URL: https://issues.apache.org/jira/browse/FLINK-16822 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.10.0 Reporter: godfrey he Fix For: 1.11.0 Users can add or change the properties for execution behavior through SET command in SQL client, e.g. {{SET execution.parallelism=10}}, {{SET table.optimizer.join-reorder-enabled=true}}. But the {{table.xx}} config can't change the TableEnvironment behavior, because the property set from CLI does not be set into TableEnvironment's table config. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16535) BatchTableSink#emitDataSet returns DataSink
godfrey he created FLINK-16535: -- Summary: BatchTableSink#emitDataSet returns DataSink Key: FLINK-16535 URL: https://issues.apache.org/jira/browse/FLINK-16535 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.11.0 Add return value for {{BatchTableSink#emitDataSet}} to support generating {{DataSet}} plan in {{BatchTableEnvironment}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16533) ExecutionEnvironment supports executing plan
godfrey he created FLINK-16533: -- Summary: ExecutionEnvironment supports executing plan Key: FLINK-16533 URL: https://issues.apache.org/jira/browse/FLINK-16533 Project: Flink Issue Type: Sub-task Components: Client / Job Submission Reporter: godfrey he Fix For: 1.11.0 Currently, {{ExecutionEnvironment}} only supports executing the plan generated by self. FLIP-84 proposes {{TableEnvironment}} can only trigger the table program and the {{StreamExecutionEnvironment}}/{{ExecutionEnvironment}} can only trigger {{DataStream}}/{{DataSet}} program. This requires that {{ExecutionEnvironment}} can execute the plan generated by {{TableEnvironment}}. We propose to add two methods in {{ExecutionEnvironment}}: (which is similar to {{StreamExecutionEnvironment}}#execute(StreamGraph) and {{StreamExecutionEnvironment}}#executeAsync(StreamGraph)) {code:java} @Internal public JobExecutionResult execute(Plan plan) throws Exception { . } @Internal public JobClient executeAsync(Plan plan) throws Exception { . } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16519) CheckpointCoordinatorFailureTest logs LinkageErrors
godfrey he created FLINK-16519: -- Summary: CheckpointCoordinatorFailureTest logs LinkageErrors Key: FLINK-16519 URL: https://issues.apache.org/jira/browse/FLINK-16519 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: godfrey he Fix For: 1.11.0 This issue is in https://travis-ci.org/apache/flink/jobs/660152153?utm_medium=notification&utm_source=slack Log output {code:java} 2020-03-09 15:52:14,550 main ERROR Could not reconfigure JMX java.lang.LinkageError: loader constraint violation: loader (instance of org/powermock/core/classloader/javassist/JavassistMockClassLoader) previously initiated loading for a different type with name "javax/management/MBeanServer" at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:757) at org.powermock.core.classloader.javassist.JavassistMockClassLoader.loadUnmockedClass(JavassistMockClassLoader.java:90) at org.powermock.core.classloader.MockClassLoader.loadClassByThisClassLoader(MockClassLoader.java:104) at org.powermock.core.classloader.DeferSupportingClassLoader.loadClass1(DeferSupportingClassLoader.java:147) at org.powermock.core.classloader.DeferSupportingClassLoader.loadClass(DeferSupportingClassLoader.java:98) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at org.apache.logging.log4j.core.jmx.Server.unregisterAllMatching(Server.java:337) at org.apache.logging.log4j.core.jmx.Server.unregisterLoggerContext(Server.java:261) at org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:165) at org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:141) at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:590) at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) at org.apache.flink.util.TestLogger.(TestLogger.java:36) at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorFailureTest.(CheckpointCoordinatorFailureTest.java:55) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.createTestInstance(PowerMockJUnit44RunnerDelegateImpl.java:197) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.createTest(PowerMockJUnit44RunnerDelegateImpl.java:182) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:204) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:160) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:134) at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34) at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:136) at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:117) at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:57) at org.power
[jira] [Created] (FLINK-16367) Introduce createDmlBatch method in TableEnvironment
godfrey he created FLINK-16367: -- Summary: Introduce createDmlBatch method in TableEnvironment Key: FLINK-16367 URL: https://issues.apache.org/jira/browse/FLINK-16367 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 As we deprecates {{execute}} method and {{explain}} method because of buffering SQLs/Tables execution problem. This issue aims to introduce a new method named createDmlBatch to support executing and explaining the batching queries. The method likes like: {code:java} interface TableEnvironment { /** * Create a DmlBatch instance which can add dml statements or Tables to the batch, * the planner can optimize all added statements and Tables together for better performance. */ DmlBatch createDmlBatch(); } interface DmlBatch { /** * add insert statement to the batch. */ void addInsert(String insert); /** * add Table with the given sink table name to the batch. */ void addInsert(String targetPath, Table table); /** * execute all statements and Tables as a batch. * * The added statements and Tables will be cleared when this method. */ ResultTable execute() throws Exception; /** * returns the AST and the execution plan to compute the result of the all statements and Tables. * * @param extended if the plan should contain additional properties. e.g. estimated cost, traits */ String explain(boolean extended); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16366) Introduce executeStatement method in TableEnvironment
godfrey he created FLINK-16366: -- Summary: Introduce executeStatement method in TableEnvironment Key: FLINK-16366 URL: https://issues.apache.org/jira/browse/FLINK-16366 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 This issue aims to introduce {{executeStatement}} which synchronously executes the given single statement immediately, and returns the execution result. {code:java} /** * Synchronously execute the given single statement immediately and the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. * If the statement is translated to a Flink job, the result will be returned until the job is finished. * * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count for `DML` (-1 means unknown), or a string message ("OK") for other statements. * @throws Exception which occurs during the execution. */ ResultTable executeStatement(String statement) throws Exception; {code} {code:java} /** * A ResultTable is the representation of the statement execution result. */ public interface ResultTable { /** * Get the schema of ResultTable. */ TableSchema getResultSchema(); /** *Get the result contents as an iterable rows. */ Iterable getResultRows(); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16364) Deprecate the methods in TableEnvironment proposed by FLIP-84
godfrey he created FLINK-16364: -- Summary: Deprecate the methods in TableEnvironment proposed by FLIP-84 Key: FLINK-16364 URL: https://issues.apache.org/jira/browse/FLINK-16364 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 In [FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment], We propose to deprecate the following methods in TableEnvironment: {code:java} void sqlUpdate(String sql) void insertInto(String targetPath, Table table) void execute(String jobName) String explain(boolean extended) Table fromTableSource(TableSource source) {code} This issue aims to deprecate them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16363) Correct the execution behavior of TableEnvironment and StreamTableEnvironment
godfrey he created FLINK-16363: -- Summary: Correct the execution behavior of TableEnvironment and StreamTableEnvironment Key: FLINK-16363 URL: https://issues.apache.org/jira/browse/FLINK-16363 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 Both {{TableEnvironment.execute()}} and {{StreamExecutionEnvironment.execute}} can trigger a Flink table program execution. However if you use {{TableEnvironment}} to build a Flink table program, you must use {{TableEnvironment.execute()}} to trigger execution, because you can’t get the {{StreamExecutionEnvironment}} instance. If you use {{StreamTableEnvironment}} to build a Flink table program, you can use both to trigger execution. If you convert a table program to a {{DataStream}} program (using {{StreamExecutionEnvironment.toAppendStream/toRetractStream}}), you also can use both to trigger execution. So it’s hard to explain which `execute` method should be used. To correct current messy trigger point, we propose that: for {{TableEnvironment}} and {{StreamTableEnvironment}}, you must use {{TableEnvironment.execute()}} to trigger table program execution, once you convert the table program to a {{DataStream}} program (through {{toAppendStream}} or {{toRetractStream}} method), you must use {{StreamExecutionEnvironment.execute}} to trigger the {{DataStream}} program. please refer to [FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment] for more detail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16362) remove deprecated method in StreamTableSink
godfrey he created FLINK-16362: -- Summary: remove deprecated method in StreamTableSink Key: FLINK-16362 URL: https://issues.apache.org/jira/browse/FLINK-16362 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 [FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment] proposes to unify the behavior of {{TableEnvironment}} and {{StreamTableEnvironment}}, and requires the {{StreamTableSink}} always returns {{DataStream}}. However {{StreamTableSink.emitDataStream}} returns nothing and is deprecated since Flink 1.9, So we will remove it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16361) FLIP-84: Improve & Refactor API of TableEnvironment
godfrey he created FLINK-16361: -- Summary: FLIP-84: Improve & Refactor API of TableEnvironment Key: FLINK-16361 URL: https://issues.apache.org/jira/browse/FLINK-16361 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: godfrey he Fix For: 1.11.0 as the [FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment] document described, We propose to deprecate the following methods in TableEnvironment: {code:java} void sqlUpdate(String sql) void insertInto(String targetPath, Table table) void execute(String jobName) String explain(boolean extended) Table fromTableSource(TableSource source) {code} meanwhile, we propose to introduce the following new methods in TableEnvironment: {code:java} // synchronously execute the given single statement immediately, and return the execution result. ResultTable executeStatement(String statement) public interface ResultTable { TableSchema getResultSchema(); Iterable getResultRows(); } // create a DmlBatch instance which can add dml statements or Tables to the batch and explain or execute them as a batch. DmlBatch createDmlBatch() interface DmlBatch { void addInsert(String insert); void addInsert(String targetPath, Table table); ResultTable execute() throws Exception ; String explain(boolean extended); } {code} We unify the Flink table program trigger point, and propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16322) wrong result after filter push down in parquet table source
godfrey he created FLINK-16322: -- Summary: wrong result after filter push down in parquet table source Key: FLINK-16322 URL: https://issues.apache.org/jira/browse/FLINK-16322 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: godfrey he Attachments: parquet-1-1.parquet I get the wrong result when run the following query: source schema: first VARCHAR id INT score DOUBLE last VARCHAR data: (parquet file is in the attachment) ("Mike", 1, 12.3d, "Smith"), ("Bob", 2, 45.6d, "Taylor"), ("Sam", 3, 7.89d, "Miller"), ("Peter", 4, 0.12d, "Smith"), ("Liz", 5, 34.5d, "Williams"), ("Sally", 6, 6.78d, "Miller"), ("Alice", 7, 90.1d, "Smith"), ("Kelly", 8, 2.34d, "Williams") query: SELECT id, `first`, `last`, score FROM ParquetTable WHERE score < 3 the expected result size is 2, however the actual result size is 0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16315) throw JsonMappingException when using BatchTableEnvironment#explain to get the plan of sql with constant string
godfrey he created FLINK-16315: -- Summary: throw JsonMappingException when using BatchTableEnvironment#explain to get the plan of sql with constant string Key: FLINK-16315 URL: https://issues.apache.org/jira/browse/FLINK-16315 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner Reporter: godfrey he {code:java} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); tEnv.registerTableSource("MyTable", CommonTestData.getCsvTableSource()); Table table = tEnv.sqlQuery("select * from MyTable where first = '274' "); System.out.println(tEnv.explain(table)); {code} when executing the above code, the following exception will occur. {panel:title=exception} org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: Unexpected character ('U' (code 85)): was expecting comma to separate Object entries at [Source: (String)"{ "nodes": [ { "id": 2, "type": "source", "pact": "Data Source", "contents": "CsvTableSource(read fields: first, id, score, last)", "parallelism": "8", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniq"[truncated 3501 chars]; line: 41, column: 15] (through reference chain: org.apache.flink.table.explain.PlanTree["nodes"]->java.util.ArrayList[1]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:365) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:302) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:27) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3205) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3173) at org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:42) at org.apache.flink.table.api.internal.BatchTableEnvImpl.explain(BatchTableEnvImpl.scala:208) at org.apache.flink.table.api.internal.BatchTableEnvImpl.explain(BatchTableEnvImpl.scala:223) {panel} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16195) append constant field to unique key set on project when deriving unique keys in FlinkRelMdUniqueKeys
godfrey he created FLINK-16195: -- Summary: append constant field to unique key set on project when deriving unique keys in FlinkRelMdUniqueKeys Key: FLINK-16195 URL: https://issues.apache.org/jira/browse/FLINK-16195 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he currently, `FlinkRelMdUniqueKeys` only supports deriving unique keys on non-constant fields. such as: `select a, b, 1, count(*) from T group by a, b`, currently the derived unique keys is `a, b`. However `a, b, 1` is also a unique key, and the result is `a, b` and `a, b, 1`. note: Ideally, the planner does not require the constant key in the unique key set, all constant values are pulled up or removed as much as possible. Supports this improvement to handle some corner cases in cbo. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16110) LogicalTypeParser can't parse "TIMESTAMP(3) *ROWTIME*" and TIMESTAMP(3) *PROCTIME*
godfrey he created FLINK-16110: -- Summary: LogicalTypeParser can't parse "TIMESTAMP(3) *ROWTIME*" and TIMESTAMP(3) *PROCTIME* Key: FLINK-16110 URL: https://issues.apache.org/jira/browse/FLINK-16110 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: godfrey he {{TIMESTAMP(3) *ROWTIME*}} is the string representation of {{TimestampType(true, TimestampKind.ROWTIME, 3)}} , however {{LogicalTypeParser}} can't convert it to {{TimestampType(true, TimestampKind.ROWTIME, 3)}}. TIMESTAMP(3) *PROCTIME* is the same case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15669) SQL client can't cancel flink job
godfrey he created FLINK-15669: -- Summary: SQL client can't cancel flink job Key: FLINK-15669 URL: https://issues.apache.org/jira/browse/FLINK-15669 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.10.0 Reporter: godfrey he Fix For: 1.10.0 in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job. {code:java} private ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext context, String query) { .. // store the result with a unique id final String resultId = UUID.randomUUID().toString(); resultStore.storeResult(resultId, result); .. // create execution final ProgramDeployer deployer = new ProgramDeployer( configuration, jobName, pipeline); // start result retrieval result.startRetrieval(deployer); return new ResultDescriptor( resultId, removeTimeAttributes(table.getSchema()), result.isMaterialized()); } private void cancelQueryInternal(ExecutionContext context, String resultId) { .. // stop Flink job try (final ClusterDescriptor clusterDescriptor = context.createClusterDescriptor()) { ClusterClient clusterClient = null; try { // retrieve existing cluster clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient(); try { clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get(); } catch (Throwable t) { // the job might has finished earlier } } catch (Exception e) { throw new SqlExecutionException("Could not retrieve or create a cluster.", e); } finally { try { if (clusterClient != null) { clusterClient.close(); } } catch (Exception e) { // ignore } } } catch (SqlExecutionException e) { throw e; } catch (Exception e) { throw new SqlExecutionException("Could not locate a cluster.", e); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15472) Support SQL Client Gateway
godfrey he created FLINK-15472: -- Summary: Support SQL Client Gateway Key: FLINK-15472 URL: https://issues.apache.org/jira/browse/FLINK-15472 Project: Flink Issue Type: New Feature Components: Table SQL / Client Reporter: godfrey he FLIP-91: https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Client+Gateway design document: https://docs.google.com/document/d/1T7--664rts4t_4gjRPw937S9ln9Plf1yghNQ9IiHQtQ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15123) remove uniqueKeys from FlinkStatistic in blink planner
godfrey he created FLINK-15123: -- Summary: remove uniqueKeys from FlinkStatistic in blink planner Key: FLINK-15123 URL: https://issues.apache.org/jira/browse/FLINK-15123 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he {{uniqueKeys}} is a kind of constraint, it's unreasonable that {{uniqueKeys}} is a kind of statistic. so we should remove uniqueKeys from {{FlinkStatistic}} in blink planner. Some temporary solutions (e.g. {{RichTableSourceQueryOperation}}) should also be resolved after primaryKey is introduced in {{TableSchema}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15095) bridge table schema's primary key to metadata handler in blink planner
godfrey he created FLINK-15095: -- Summary: bridge table schema's primary key to metadata handler in blink planner Key: FLINK-15095 URL: https://issues.apache.org/jira/browse/FLINK-15095 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.10.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15004) Choose SortMergeJoin instead of HashJoin if the statistics is unknown
godfrey he created FLINK-15004: -- Summary: Choose SortMergeJoin instead of HashJoin if the statistics is unknown Key: FLINK-15004 URL: https://issues.apache.org/jira/browse/FLINK-15004 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0 Reporter: godfrey he Fix For: 1.10.0 Currently, blink planner will use default rowCount value (defined in {{FlinkPreparingTableBase#DEFAULT_ROWCOUNT}} ) when the statistics is unknown, and maybe choose {{HashJoin}} instead of {{SortMergeJoin}}. The job will hang if the build side has huge input size. So It's better to use {{SortMergeJoin}} for execution stability if the statistics is unknown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15001) The digest of sub-plan reuse should contain RelNode's trait
godfrey he created FLINK-15001: -- Summary: The digest of sub-plan reuse should contain RelNode's trait Key: FLINK-15001 URL: https://issues.apache.org/jira/browse/FLINK-15001 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0 Reporter: godfrey he Fix For: 1.10.0 Attachments: image-2019-12-02-10-49-46-916.png, image-2019-12-02-10-52-01-399.png This bug is found in [FLINK-14946| https://issues.apache.org/jira/browse/FLINK-14946]: The plan for the given sql in [FLINK-14946| https://issues.apache.org/jira/browse/FLINK-14946] is !image-2019-12-02-10-49-46-916.png! however, the plan after sub-plan reuse is: !image-2019-12-02-10-52-01-399.png! in the first picture, we could find that the accMode of two joins are different, but the two joins are reused in the second picture. The reason is the digest of sub-plan reuse does not contain RelNode's trait now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14999) sub-plan reuse should consider
godfrey he created FLINK-14999: -- Summary: sub-plan reuse should consider Key: FLINK-14999 URL: https://issues.apache.org/jira/browse/FLINK-14999 Project: Flink Issue Type: Bug Reporter: godfrey he -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14899) can not be translated to StreamExecDeduplicate when PROCTIME is defined in query
godfrey he created FLINK-14899: -- Summary: can not be translated to StreamExecDeduplicate when PROCTIME is defined in query Key: FLINK-14899 URL: https://issues.apache.org/jira/browse/FLINK-14899 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0 Reporter: godfrey he Fix For: 1.10.0 CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘**', 'connector.write.flush.max-rows' = '1' ); INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc asc ) AS rownum FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14874) add local aggregate to solve data skew for ROLLUP/CUBE case
godfrey he created FLINK-14874: -- Summary: add local aggregate to solve data skew for ROLLUP/CUBE case Key: FLINK-14874 URL: https://issues.apache.org/jira/browse/FLINK-14874 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0 Reporter: godfrey he Fix For: 1.10.0 Many tpc-ds queries have {{rollup}} keyword, which will be translated to multiple groups. for example: {{group by rollup (channel, id) }} is equivalent {{ group by (channel, id)}} + {{ group by (channel)}} + {{ group by () }}. All data on empty group will be shuffled to a single node, It is a typical data skew case. If there is a local aggregate, the data size shuffled to the single node will be greatly reduced. However, currently the cost mode can't estimate the local aggregate's cost, and the plan with local aggregate may be chose even the query has {{rollup}} keyword. we could add a rule based phase (after physical phase) to enforce local aggregate if it's input has empty group. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14789) extends max/min type in ColumnStats from Number to Comparable
godfrey he created FLINK-14789: -- Summary: extends max/min type in ColumnStats from Number to Comparable Key: FLINK-14789 URL: https://issues.apache.org/jira/browse/FLINK-14789 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.10.0 Many tpc-ds queries have predicates on date, like `d_date between '1999-02-01' and (cast('1999-02-01' as date) + INTERVAL '60' day)`, It's very useful to find a better plan if the planner knows the max/min values of date. However, max/min in {{ColumnStats}} only support {{Number}} type currently. This issue aims to extend max/min type from {{Number}} to {{Comparable}}, and then {{Date}}, {{Time}}, {{Timestamp}} even {{String}} could be supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14724) Join condition could be simplified in logical phase
godfrey he created FLINK-14724: -- Summary: Join condition could be simplified in logical phase Key: FLINK-14724 URL: https://issues.apache.org/jira/browse/FLINK-14724 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0 Reporter: godfrey he Fix For: 1.10.0 currently the plan of tpcds q38.sql contains {{NestedLoopJoin}}, because it's join condition is {{CAST(AND(IS NOT DISTINCT FROM($2, $3), IS NOT DISTINCT FROM($1, $4), IS NOT DISTINCT FROM($0, $5))):BOOLEAN}}, and planner can't find equal join keys from the condition by {{Join#analyzeCondition. {{SimplifyJoinConditionRule}} could solve this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14656) blink planner should convert catalog statistics to TableStats for permanent table instead of temporary table
godfrey he created FLINK-14656: -- Summary: blink planner should convert catalog statistics to TableStats for permanent table instead of temporary table Key: FLINK-14656 URL: https://issues.apache.org/jira/browse/FLINK-14656 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0 Reporter: godfrey he Fix For: 1.10.0 currently, blink planner will convert {{CatalogTable}} to Calcite {{Table}}, and convert the catalog statistics to `TableStats` in {{DatabaseCalciteSchema}}. However, the catalog statistics conversion is only for temporary table which has no any statistics now. It should be for permanent table. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-13708) transformations should be cleared because a table environment could execute multiple job
godfrey he created FLINK-13708: -- Summary: transformations should be cleared because a table environment could execute multiple job Key: FLINK-13708 URL: https://issues.apache.org/jira/browse/FLINK-13708 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.9.0 currently, if a table environment execute more than one sql jobs, the following job contains transformations about the previous job. the reason is the transformations is not cleared after execution -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13611) Introduce analyze statistic utility to generate table & column statistics
godfrey he created FLINK-13611: -- Summary: Introduce analyze statistic utility to generate table & column statistics Key: FLINK-13611 URL: https://issues.apache.org/jira/browse/FLINK-13611 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.10.0 this issue aims to introduce a utility class to generate table & column statistics, the main steps include: 1. generate sql, like {{select approx_count_distinct(a) as ndv, count(1) - count(a) as nullCount, avg(char_length(a)) as avgLen, max(char_lenght(a)) as maxLen, max(a) as maxValue, min(a) as minValue, ... from MyTable }} 2. execute the query 3. convert to the result to {{TableStats}} (maybe the source table is not a catalog table) 4. convert to {{TableStats}} to {{CatalogTableStatistics}} if needed This issue does not involve DDL, however the DDL could use this utility class once it's supported. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13564) blink planner should also throw exception if constant with YEAR TO MONTH resolution was used for group windows
godfrey he created FLINK-13564: -- Summary: blink planner should also throw exception if constant with YEAR TO MONTH resolution was used for group windows Key: FLINK-13564 URL: https://issues.apache.org/jira/browse/FLINK-13564 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0, 1.10.0 Reporter: godfrey he Fix For: 1.9.0, 1.10.0 just as [FLINK-11017|https://issues.apache.org/jira/browse/FLINK-11017], blink planner should also throw exception if constant with YEAR TO MONTH resolution was used for group windows -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13563) TumblingGroupWindow should implement toString method
godfrey he created FLINK-13563: -- Summary: TumblingGroupWindow should implement toString method Key: FLINK-13563 URL: https://issues.apache.org/jira/browse/FLINK-13563 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0, 1.10.0 Reporter: godfrey he Fix For: 1.9.0, 1.10.0 {code:scala} @Test def testAllEventTimeTumblingGroupWindowOverTime(): Unit = { val util = streamTestUtil() val table = util.addDataStream[(Long, Int, String)]( "T1", 'long, 'int, 'string, 'rowtime.rowtime) val windowedTable = table .window(Tumble over 5.millis on 'rowtime as 'w) .groupBy('w) .select('int.count) util.verifyPlan(windowedTable) } {code} currently, it's physical plan is {code:java} HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0) AS EXPR$0]) +- Exchange(distribution=[single]) +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_COUNT(int) AS count$0]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string]) {code} we know nothing about the TumblingGroupWindow except its name -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13562) throws exception when FlinkRelMdColumnInterval meets two stage stream group aggregate
godfrey he created FLINK-13562: -- Summary: throws exception when FlinkRelMdColumnInterval meets two stage stream group aggregate Key: FLINK-13562 URL: https://issues.apache.org/jira/browse/FLINK-13562 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.9.0, 1.10.0 test case: {code:scala} @Test def testTwoDistinctAggregateWithNonDistinctAgg(): Unit = { util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) util.verifyPlan("SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c") } {code} org.apache.flink.table.api.TableException: Sum aggregate function does not support type: ''VARCHAR''. Please re-check the data type. at org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createSumAggFunction(AggFunctionFactory.scala:191) at org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:74) at org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:285) at org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:279) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:279) at org.apache.flink.table.planner.plan.utils.AggregateUtil$.getOutputIndexToAggCallIndexMap(AggregateUtil.scala:154) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getAggCallIndexInLocalAgg$1(FlinkRelMdColumnInterval.scala:504) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.estimateColumnIntervalOfAggregate(FlinkRelMdColumnInterval.scala:526) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnInterval(FlinkRelMdColumnInterval.scala:417) at GeneratedMetadataHandler_ColumnInterval.getColumnInterval_$(Unknown Source) at GeneratedMetadataHandler_ColumnInterval.getColumnInterval(Unknown Source) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getColumnInterval(FlinkRelMetadataQuery.java:122) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13545) JoinToMultiJoinRule should not match SEMI/ANTI LogicalJoin
godfrey he created FLINK-13545: -- Summary: JoinToMultiJoinRule should not match SEMI/ANTI LogicalJoin Key: FLINK-13545 URL: https://issues.apache.org/jira/browse/FLINK-13545 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0, 1.10.0 Reporter: godfrey he Fix For: 1.9.0, 1.10.0 run tpcds 14.a on blink planner, an exception will thrown java.lang.ArrayIndexOutOfBoundsException: 84 at org.apache.calcite.rel.rules.JoinToMultiJoinRule$InputReferenceCounter.visitInputRef(JoinToMultiJoinRule.java:564) at org.apache.calcite.rel.rules.JoinToMultiJoinRule$InputReferenceCounter.visitInputRef(JoinToMultiJoinRule.java:555) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexVisitorImpl.visitCall(RexVisitorImpl.java:80) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.calcite.rel.rules.JoinToMultiJoinRule.addOnJoinFieldRefCounts(JoinToMultiJoinRule.java:481) at org.apache.calcite.rel.rules.JoinToMultiJoinRule.onMatch(JoinToMultiJoinRule.java:166) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) the reason is {{JoinToMultiJoinRule}} should match SEMI/ANTI LogicalJoin. before calcite-1.20, SEMI join is represented by {{SemiJoin}} which is not matched {{JoinToMultiJoinRule}}. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13502) CatalogTableStatisticsConverter should be in planner.utils package
godfrey he created FLINK-13502: -- Summary: CatalogTableStatisticsConverter should be in planner.utils package Key: FLINK-13502 URL: https://issues.apache.org/jira/browse/FLINK-13502 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.9.0, 1.10.0 currently, {{CatalogTableStatisticsConverter}} is in {{org.apache.flink.table.util}}, its correct position is {{org.apache.flink.table.planner.utils}} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13404) Port csv factories & validator from flink-table-planner to flink-csv
godfrey he created FLINK-13404: -- Summary: Port csv factories & validator from flink-table-planner to flink-csv Key: FLINK-13404 URL: https://issues.apache.org/jira/browse/FLINK-13404 Project: Flink Issue Type: Sub-task Components: Table SQL / Legacy Planner Reporter: godfrey he Fix For: 1.9.0, 1.10.0 Blink planner does not define any csv factories & validator, so port csv factories & validator from flink-table-planner to flink-csv, and let both planners use them -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13403) Correct package name after relocation
godfrey he created FLINK-13403: -- Summary: Correct package name after relocation Key: FLINK-13403 URL: https://issues.apache.org/jira/browse/FLINK-13403 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.9.0, 1.10.0 some scala classes's package name is not updated after [FLINK-13266|https://issues.apache.org/jira/browse/FLINK-13267], this issue aims to correct the package names -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13347) should handle new JoinRelType(SEMI/ANTI) in switch case
godfrey he created FLINK-13347: -- Summary: should handle new JoinRelType(SEMI/ANTI) in switch case Key: FLINK-13347 URL: https://issues.apache.org/jira/browse/FLINK-13347 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner, Table SQL / Planner Reporter: godfrey he Fix For: 1.9.0, 1.10.0 Calcite 1.20 introduces {{SEMI}} & {{ANTI}} to {{JoinRelType}}, blink planner & flink planner should handle them in each switch case -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13269) copy RelDecorrelator from blink planner to flink planner to fix CALCITE-3169 & CALCITE-3170
godfrey he created FLINK-13269: -- Summary: copy RelDecorrelator from blink planner to flink planner to fix CALCITE-3169 & CALCITE-3170 Key: FLINK-13269 URL: https://issues.apache.org/jira/browse/FLINK-13269 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he Fix For: 1.9.0, 1.10.0 [CALCITE-3169|https://issues.apache.org/jira/browse/CALCITE-3169] & [CALCITE-3170|https://issues.apache.org/jira/browse/CALCITE-3170] are not fixed in Calcite-1.20. `RelDecorrelator` is copied from Calcite to blink planner to resolve those two bug. to make both planners available in one jar, `RelDecorrelator` should also be copied to flink planner. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13268) revert SqlSplittableAggFunction to make two planners available in one jar
godfrey he created FLINK-13268: -- Summary: revert SqlSplittableAggFunction to make two planners available in one jar Key: FLINK-13268 URL: https://issues.apache.org/jira/browse/FLINK-13268 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he Fix For: 1.9.0, 1.10.0 currently, SqlSplittableAggFunction is copied from Calcite and its `topSplit` method is extended to supports Left/Right outer join (see more: [CALCITE-2378|http://issues.apache.org/jira/browse/CALCITE-2378]). this new feature is only used for tpc-h now, so we will revert this class to make both planners available in one jar. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13263) supports explain DAG plan in flink-python
godfrey he created FLINK-13263: -- Summary: supports explain DAG plan in flink-python Key: FLINK-13263 URL: https://issues.apache.org/jira/browse/FLINK-13263 Project: Flink Issue Type: New Feature Components: API / Python Reporter: godfrey he Assignee: godfrey he Fix For: 1.9.0, 1.10.0 update existing `explain` to support explain DAG plan in flink-python -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13185) Bump Calcite dependency to 1.20.0 in sql parser & flink planner
godfrey he created FLINK-13185: -- Summary: Bump Calcite dependency to 1.20.0 in sql parser & flink planner Key: FLINK-13185 URL: https://issues.apache.org/jira/browse/FLINK-13185 Project: Flink Issue Type: Improvement Components: Table SQL / Legacy Planner Reporter: godfrey he Assignee: godfrey he blink planner had upgraded calcite version to 1.20.0 (before version is 1.19.0), and blink planner will support DDL in FLINK-1.9 which depends on flink-sql-parser. so calcite version in flink-sql-parser should also be upgrade to 1.20.0. [~walterddr], [FLINK-11935|https://issues.apache.org/jira/browse/FLINK-11935] will not be fixed in this issue, because supporting DDL in blink planner is blocked by this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)