[jira] [Created] (FLINK-29990) Unparsed SQL for SqlTableLike cannot be parsed correctly
Shuo Cheng created FLINK-29990: -- Summary: Unparsed SQL for SqlTableLike cannot be parsed correctly Key: FLINK-29990 URL: https://issues.apache.org/jira/browse/FLINK-29990 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.0 Reporter: Shuo Cheng Fix For: 1.17.0 Consider the following DDL sql: {code:java} create table source_table( a int, b bigint, c string ) LIKE parent_table{code} After unparsed by sql parser, we get the following result: {code:java} CREATE TABLE `SOURCE_TABLE` ( `A` INTEGER, `B` BIGINT, `C` STRING ) LIKE `PARENT_TABLE` ( ) {code} Exception will be thrown, when you trying to parse the above sql. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-24762) Remove agg function: INCR_SUM
Shuo Cheng created FLINK-24762: -- Summary: Remove agg function: INCR_SUM Key: FLINK-24762 URL: https://issues.apache.org/jira/browse/FLINK-24762 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Reporter: Shuo Cheng Fix For: 1.15.0 The removal of INCR_SUM should be completed in FLINK-13529, but that PR only removes function definition of INCR_SUM. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24598) Current IT case do not cover fallback path for hash aggregate
Shuo Cheng created FLINK-24598: -- Summary: Current IT case do not cover fallback path for hash aggregate Key: FLINK-24598 URL: https://issues.apache.org/jira/browse/FLINK-24598 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.14.0, 1.15.0 Reporter: Shuo Cheng Test data in AggregateITCaseBase#testBigData is not big enough to trigger hash agg to sort and spill buffer and fallback to sort agg. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24048) Move changeLog inference out of optimizing phase
Shuo Cheng created FLINK-24048: -- Summary: Move changeLog inference out of optimizing phase Key: FLINK-24048 URL: https://issues.apache.org/jira/browse/FLINK-24048 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.14.0 Reporter: Shuo Cheng Fix For: 1.15.0 Currently, when there are multiple sinks in a sql job, the DAG is split into multiple relNode blocks; as changeLog inference is in optimizing phase, we need to propagate the changeLog mode among blocks to ensure each block can generate an accurate physical plan. In current solution, the DAG is optimized 3 times, which is inefficient. Actually, we can just optimize the DAG, expanding the DAG to a physical node tree, and then infer changeLog mode. In this way, the dag is only optimized 1 time. (Similarly, minibatch interval can also be inferred is same way) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23962) UpdateKind trait is not propagated properly in changeLog inference for DAG optimizing
Shuo Cheng created FLINK-23962: -- Summary: UpdateKind trait is not propagated properly in changeLog inference for DAG optimizing Key: FLINK-23962 URL: https://issues.apache.org/jira/browse/FLINK-23962 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.2, 1.14.0 Reporter: Shuo Cheng Fix For: 1.13.3, 1.14.1 For sql jobs with multi-sinks, the plan is divided into relNode blocks, changeLog mode should be also inferred among blocks. Currently, updateKind trait is not propagated properly from parent block to child blocks for the following pattern. block0 -> block1 -> block3 \-> block2 In the above example, block3 requies UB, block2 does not require UB. For Agg in block0, UB should be emitted, but the updateKind for block0 is inferred as ONLY_UPDATE_AFTER. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23827) Fix ModifiedMonotonicity inference for some node
Shuo Cheng created FLINK-23827: -- Summary: Fix ModifiedMonotonicity inference for some node Key: FLINK-23827 URL: https://issues.apache.org/jira/browse/FLINK-23827 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.14.0 Reporter: Shuo Cheng Fix For: 1.14.1 ModifiedMonotonicity handler do not handle some node properly, such as IntermediateTableScan, Deduplicate and LookupJoin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23310) Correct the `ModifyKindSetTrait` for `GroupWindowAggregate` when the input is an update stream
Shuo Cheng created FLINK-23310: -- Summary: Correct the `ModifyKindSetTrait` for `GroupWindowAggregate` when the input is an update stream Key: FLINK-23310 URL: https://issues.apache.org/jira/browse/FLINK-23310 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.13.0 Reporter: Shuo Cheng Fix For: 1.14.0 Following FLINK-22781, currently group window supports update input stream, just like unbounded aggregate, group window may also emit DELETE records, so the `ModifyKindSetTrait` for group window should be modified as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23136) Make `addSource` method with specific boundedness in `StreamExecutionEnvironment` public
Shuo Cheng created FLINK-23136: -- Summary: Make `addSource` method with specific boundedness in `StreamExecutionEnvironment` public Key: FLINK-23136 URL: https://issues.apache.org/jira/browse/FLINK-23136 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Shuo Cheng Currently, when we add a source function by calling `addSource(sourceFunction, name, typeinfo)` of `StreamExecutionEnvironment` , the source is set to CONTINUOUS_UNBOUNDED by default (Flink-19392). However, for table api / sql job, the batch source is also created by calling the above `addSource` method, which is thus set to CONTINUOUS_UNBOUNDED and produces unexpected behavior. I think StreamExecutionEnvironment should expose an `addSource` method to allow users to pass a boundedness parameter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23086) Add getRuntimeExecutionMode to StreamExecutionEnvironment
Shuo Cheng created FLINK-23086: -- Summary: Add getRuntimeExecutionMode to StreamExecutionEnvironment Key: FLINK-23086 URL: https://issues.apache.org/jira/browse/FLINK-23086 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Affects Versions: 1.13.0 Reporter: Shuo Cheng Fix For: 1.14.0 `RuntimeExecutionMode` is used when creating a `StreamGraphGenerator` inside `StreamExecutionEnvironment`, however, when we want to create a `StreamGraphGenerator` outside `StreamExecutionEnvironment` (like ExecutorUtils#generateStreamGraph), `getRuntimeExecutionMode` is needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22586) Improve precision derivation for decimal arithmetics
Shuo Cheng created FLINK-22586: -- Summary: Improve precision derivation for decimal arithmetics Key: FLINK-22586 URL: https://issues.apache.org/jira/browse/FLINK-22586 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.13.0 Reporter: Shuo Cheng Fix For: 1.14.0 Currently the precision and scale derivation is not properly for decimal data arithmetics, e.g, considering the following example: {code:java} select cast('10.1' as decimal(38, 19)) * cast('10.2' as decimal(38, 19)) from T{code} the result is `null`, which may confuses use a lot, because the result is far from overflow actually. The root cause is the precision derivation for the above multiplication is: (38, 19) * (38, 19) -> (38, 38) So there is no space for integral digits, which leads to null results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22304) Refactor some interfaces for TVF based window to improve the scalability
Shuo Cheng created FLINK-22304: -- Summary: Refactor some interfaces for TVF based window to improve the scalability Key: FLINK-22304 URL: https://issues.apache.org/jira/browse/FLINK-22304 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.13.0 Reporter: Shuo Cheng Fix For: 1.13.1 Refactoring `WindowBuffer` and `WindowCombineFunction` to make the implementation more scalable -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22148) Planner rules should use RexCall#equsls to check whether two rexCalls are equivalent
Shuo Cheng created FLINK-22148: -- Summary: Planner rules should use RexCall#equsls to check whether two rexCalls are equivalent Key: FLINK-22148 URL: https://issues.apache.org/jira/browse/FLINK-22148 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.2 Reporter: Shuo Cheng Fix For: 1.13.0 Reproduce the bug by add the following test to `SemiAntiJoinTest` {code:java} // code placeholder @Test def testNotSimplifyJoinConditionWithSameDigest(): Unit = { val sqlQuery = """ |SELECT a |FROM l |WHERE c NOT IN ( |SELECT f FROM r WHERE f = c) |""".stripMargin util.verifyRelPlan(sqlQuery) } {code} CannotPlanException will be thrown, this is because Calcite planner will normalize a RexCall in the `equals` method (from 1.24), while in Flink planer rules, we still use toString to check two RexCalls are equivalent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22063) Lookup Join outputs wrong results for some scenario
Shuo Cheng created FLINK-22063: -- Summary: Lookup Join outputs wrong results for some scenario Key: FLINK-22063 URL: https://issues.apache.org/jira/browse/FLINK-22063 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.12.2 Reporter: Shuo Cheng Reproduce the bug as following: In LookupJoinITCase, given the sql {code:sql} SELECT T.id, T.len, D.id, T.content, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id and cast(T.len as bigint) = D.id {code} the following execution plan is generated: {code:java} LegacySink(name=[DataStreamTableSink], fields=[id, len, id0, content, name]) +- Calc(select=[id, len, id0, content, name]) +- LookupJoin(table=[**], joinType=[InnerJoin], async=[false], lookup=[id=len0], select=[id, len, content, len0, id, name]) +- Calc(select=[id, len, content, CAST(len) AS len0]) +- TableSourceScan(table=[[**]], fields=[id, len, content]) {code} As we can see, the condition `T.id = D.id` is lost, so a wrong result may be produced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21613) Parse Compute Column with `IN` expression throws NPE
Shuo Cheng created FLINK-21613: -- Summary: Parse Compute Column with `IN` expression throws NPE Key: FLINK-21613 URL: https://issues.apache.org/jira/browse/FLINK-21613 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.13.0 Reporter: Shuo Cheng Considering the following given sql: {code:sql} CREATE TABLE MyInputFormatTable ( `a` INT, `b` BIGINT, `c` STRING, `d` as `c` IN ('Hi', 'Hello') ) WITH ( 'connector' = 'values', 'data-id' = '$dataId', 'runtime-source' = 'InputFormat' ) {code} NPE will be thrown during parsing the sql: `select * from MyInputFormatTable` It seems it's the commit "[hotfix][table-planner-blink] Simplify SQL expression to RexNode conversion" which introduces this problem. This hotfix uses a method `SqlToRelConverter#convertExpression` and this method does not has any tests and is not used in Calcite anywhere, which is unsafe. Maybe reverting the hotfix is a good choice. CC [~twalthr] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20826) Fix bug in streaming SQL examples
Shuo Cheng created FLINK-20826: -- Summary: Fix bug in streaming SQL examples Key: FLINK-20826 URL: https://issues.apache.org/jira/browse/FLINK-20826 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.12.0 Reporter: Shuo Cheng Fix For: 1.13.0 There is some minor bug in `UpdatingTopCityExample`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20670) Support query hints for Flink SQL
Shuo Cheng created FLINK-20670: -- Summary: Support query hints for Flink SQL Key: FLINK-20670 URL: https://issues.apache.org/jira/browse/FLINK-20670 Project: Flink Issue Type: New Feature Components: Table SQL / API Reporter: Shuo Cheng Fix For: 1.13.0 Now Flink has supported dynamic table options based on the Hint framework of Calcite. Besides this, Hint syntax is also very useful for query optimization, as there is no perfect planner, and query hints provide a mechanism to direct the optimizer to choose an efficient query execution plan based on the specific criteria. Currently, almost most popular databases and big-data frameworks support query hint, for example, Join hints in Oracle are used as following: {code:sql} SELECT /*+ USE_MERGE(employees departments) */ * FROM employees, departments WHERE employees.department_id = departments.department_id; {code} Flink also have several join strategy for batch job, i.e., Nested-Loop, Sort-Merge and Hash Join, it will be convenient for users to produce an efficient join execution plan if Flink supports Join hint. Besides Join in batch job, it's also possible to use join hints to support partitioned temporal table join in future. In a word, query hint, especially join hint now, will benefit Flink users a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17651) DecomposeGroupingSetsRule generates wrong plan when there exist distinct agg and simple agg with same filter
Shuo Cheng created FLINK-17651: -- Summary: DecomposeGroupingSetsRule generates wrong plan when there exist distinct agg and simple agg with same filter Key: FLINK-17651 URL: https://issues.apache.org/jira/browse/FLINK-17651 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.1 Reporter: Shuo Cheng Fix For: 1.11.0 Consider adding the following test case to org.apache.flink.table.planner.runtime.batch.sql.agg.AggregateITCaseBase. As you can see, the actual result is wrong. {code:java} @Test def testSimpleAndDistinctAggWithCommonFilter(): Unit = { val sql = """ |SELECT | h, | COUNT(1) FILTER(WHERE d > 1), | COUNT(1) FILTER(WHERE d < 2), | COUNT(DISTINCT e) FILTER(WHERE d > 1) |FROM Table5 |GROUP BY h |""".stripMargin checkResult( sql, Seq( row(1,4,1,4), row(2,7,0,7), row(3,3,0,3) ) ) } Results == Correct Result == == Actual Result == 1,4,1,41,0,1,4 2,7,0,72,0,0,7 3,3,0,33,0,0,3 {code} The problem lies in `DecomposeGroupingSetsRule`, which omits filter arg of aggregate call when doing some processing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17649) Generated hash aggregate code may produce NPE when there exists an aggregate call with Filter.
Shuo Cheng created FLINK-17649: -- Summary: Generated hash aggregate code may produce NPE when there exists an aggregate call with Filter. Key: FLINK-17649 URL: https://issues.apache.org/jira/browse/FLINK-17649 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.1 Reporter: Shuo Cheng Fix For: 1.11.0 When generating code for Filter predicate of aggregate call in `HashAggCodeGenHelper`, we should check the nullability of the boolean field firstly rather than `getBoolean` directly, otherwise, NPE may be produced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16578) join lateral table function with condition fails with exception
Shuo Cheng created FLINK-16578: -- Summary: join lateral table function with condition fails with exception Key: FLINK-16578 URL: https://issues.apache.org/jira/browse/FLINK-16578 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Shuo Cheng Reproducing: {code:java} // CorrelateITCase.scala @Test def testJoinTableFunction(): Unit = { registerFunction("func", new TableFunc2) val sql = """ | select | c, s, l | from inputT JOIN LATERAL TABLE(func(c)) as T(s, l) | on s = c |""".stripMargin checkResult(sql, Seq()) } {code} The it case will be failed with exception: "Cannot generate a valid execution plan for the given query". Firstly, for the given sql, the logical plan produced by SqlToRelConvert is already wrong, which is a bug introduced by CALCITE-2004, and fixed in CALCITE-3847 (fixed versions 1.23). Secondly, even after the fix, we may fail in `FlinkCorrelateVariablesValidationProgram`, because after decorrelating, there exists correlate variable in a `LogicalFilter`. we should fix the validation problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16577) Exception will be thrown when computing columnInterval relmetadata in some case
Shuo Cheng created FLINK-16577: -- Summary: Exception will be thrown when computing columnInterval relmetadata in some case Key: FLINK-16577 URL: https://issues.apache.org/jira/browse/FLINK-16577 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Shuo Cheng Fix For: 1.11.0 Attachments: image-2020-03-13-10-32-35-375.png Consider the following SQL {code:java} // a: INT, c: LONG SELECT c, SUM(a) FROM T WHERE a > 0.1 AND a < 1 GROUP BY c{code} Here the sql type of 0.1 is Decimal and 1 is Integer, and they are both in NUMERIC type family, and do not trigger type coercion, so the plan is: {code:java} FlinkLogicalAggregate(group=[{0}], EXPR$1=[SUM($1)]) +- FlinkLogicalCalc(select=[c, a], where=[AND(>(a, 0.1:DECIMAL(2, 1)), <(a, 1))]) +- FlinkLogicalTableSourceScan(table=[[...]], fields=[a, b, c]) {code} When we calculate the filtered column interval of calc, it'll lead to validation exception of `FiniteValueInterval`: !image-2020-03-13-10-32-35-375.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15537) Type of keys should be `BinaryRow` when manipulating map state with `BaseRow` as key type.
Shuo Cheng created FLINK-15537: -- Summary: Type of keys should be `BinaryRow` when manipulating map state with `BaseRow` as key type. Key: FLINK-15537 URL: https://issues.apache.org/jira/browse/FLINK-15537 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1 Reporter: Shuo Cheng Fix For: 1.11.0 `BaseRow` is serialized and deserialized as `BinaryRow` by default, so when the key type of the map state is `BaseRow`, we should construct map keys with `BinaryRow` as type to get value from map state, otherwise, you would always get Null... Try it with following SQL: {code:java} // (b: Int, c: String) SELECT b, listagg(DISTINCT c, '#') FROM MyTable GROUP BY b {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15466) `FlinkAggregateExpandDistinctAggregatesRule` generates wrong plan for cases that have distinct aggs with filter.
Shuo Cheng created FLINK-15466: -- Summary: `FlinkAggregateExpandDistinctAggregatesRule` generates wrong plan for cases that have distinct aggs with filter. Key: FLINK-15466 URL: https://issues.apache.org/jira/browse/FLINK-15466 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0, 1.8.3 Reporter: Shuo Cheng Fix For: 1.10.0 Attachments: image-2020-01-03-14-20-54-887.png For the sql in batch mode, {code:java} SELECT a, COUNT(a), SUM(DISTINCT b) FILTER (WHERE a > 0) FROM MyTable GROUP BY a{code} plan generated after logical stage is as following, which is not correct. The `Filter $4` should be `$f2 *and* $g_0`. !image-2020-01-03-14-20-54-887.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)