[jira] [Created] (FLINK-29990) Unparsed SQL for SqlTableLike cannot be parsed correctly

2022-11-10 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-29990:
--

 Summary: Unparsed SQL for SqlTableLike cannot be parsed correctly
 Key: FLINK-29990
 URL: https://issues.apache.org/jira/browse/FLINK-29990
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: Shuo Cheng
 Fix For: 1.17.0


Consider the following DDL sql:
{code:java}
create table source_table(
  a int,
  b bigint,
  c string
)
LIKE parent_table{code}
After unparsed by sql parser, we get the following result:
{code:java}
CREATE TABLE `SOURCE_TABLE` (
  `A` INTEGER,
  `B` BIGINT,
  `C` STRING
)
LIKE `PARENT_TABLE` (
) {code}
Exception will be thrown, when you trying to parse the above sql.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-24762) Remove agg function: INCR_SUM

2021-11-04 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-24762:
--

 Summary: Remove agg function: INCR_SUM
 Key: FLINK-24762
 URL: https://issues.apache.org/jira/browse/FLINK-24762
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Shuo Cheng
 Fix For: 1.15.0


The removal of INCR_SUM should be completed in FLINK-13529, but that PR only 
removes function definition of INCR_SUM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24598) Current IT case do not cover fallback path for hash aggregate

2021-10-20 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-24598:
--

 Summary: Current IT case do not cover fallback path for hash 
aggregate
 Key: FLINK-24598
 URL: https://issues.apache.org/jira/browse/FLINK-24598
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.14.0, 1.15.0
Reporter: Shuo Cheng


Test data in AggregateITCaseBase#testBigData is not big enough to trigger hash 
agg to sort and spill buffer and fallback to sort agg.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24048) Move changeLog inference out of optimizing phase

2021-08-30 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-24048:
--

 Summary: Move changeLog inference out of optimizing phase
 Key: FLINK-24048
 URL: https://issues.apache.org/jira/browse/FLINK-24048
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.14.0
Reporter: Shuo Cheng
 Fix For: 1.15.0


Currently, when there are multiple sinks in a sql job, the DAG is split into 
multiple relNode blocks; as changeLog inference is in optimizing phase, we need 
to propagate the changeLog mode among blocks to ensure each block can generate 
an accurate physical plan.

In current solution, the DAG is optimized 3 times, which is inefficient. 
Actually, we can just optimize the DAG, expanding the DAG to a physical node 
tree, and then infer changeLog mode. In this way, the dag is only optimized 1 
time.

(Similarly, minibatch interval can also be inferred is same way)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23962) UpdateKind trait is not propagated properly in changeLog inference for DAG optimizing

2021-08-25 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-23962:
--

 Summary: UpdateKind trait is not propagated properly in changeLog 
inference for DAG optimizing
 Key: FLINK-23962
 URL: https://issues.apache.org/jira/browse/FLINK-23962
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.2, 1.14.0
Reporter: Shuo Cheng
 Fix For: 1.13.3, 1.14.1


For sql jobs with multi-sinks, the plan is divided into relNode blocks, 
changeLog mode should be also inferred among blocks. Currently, updateKind 
trait is not propagated properly from parent block to child blocks for the 
following pattern.

 

block0 -> block1 -> block3

            \-> block2

 

In the above example, block3 requies UB, block2 does not require UB.

For Agg in block0, UB should be emitted, but the updateKind for block0 is 
inferred as 

ONLY_UPDATE_AFTER.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23827) Fix ModifiedMonotonicity inference for some node

2021-08-16 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-23827:
--

 Summary: Fix ModifiedMonotonicity inference for some node
 Key: FLINK-23827
 URL: https://issues.apache.org/jira/browse/FLINK-23827
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.14.0
Reporter: Shuo Cheng
 Fix For: 1.14.1


ModifiedMonotonicity handler do not handle some node properly, such as 
IntermediateTableScan, Deduplicate and LookupJoin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23310) Correct the `ModifyKindSetTrait` for `GroupWindowAggregate` when the input is an update stream

2021-07-08 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-23310:
--

 Summary: Correct the `ModifyKindSetTrait` for 
`GroupWindowAggregate` when the input is an update stream 
 Key: FLINK-23310
 URL: https://issues.apache.org/jira/browse/FLINK-23310
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.0
Reporter: Shuo Cheng
 Fix For: 1.14.0


Following FLINK-22781, currently group window supports update input stream, 
just like unbounded aggregate, group window may also emit DELETE records, so 
the `ModifyKindSetTrait` for group window should be modified as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23136) Make `addSource` method with specific boundedness in `StreamExecutionEnvironment` public

2021-06-24 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-23136:
--

 Summary: Make `addSource` method with specific boundedness in 
`StreamExecutionEnvironment` public
 Key: FLINK-23136
 URL: https://issues.apache.org/jira/browse/FLINK-23136
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Shuo Cheng


Currently, when we add a source function by calling `addSource(sourceFunction, 
name, typeinfo)` of `StreamExecutionEnvironment` , the source is set to 
CONTINUOUS_UNBOUNDED by default (Flink-19392). However, for table api / sql 
job, the batch source is also created by calling the above `addSource` method, 
which is thus set to CONTINUOUS_UNBOUNDED and produces unexpected behavior. I 
think StreamExecutionEnvironment should expose an `addSource` method to allow 
users to pass a boundedness parameter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23086) Add getRuntimeExecutionMode to StreamExecutionEnvironment

2021-06-22 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-23086:
--

 Summary: Add getRuntimeExecutionMode to StreamExecutionEnvironment
 Key: FLINK-23086
 URL: https://issues.apache.org/jira/browse/FLINK-23086
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Affects Versions: 1.13.0
Reporter: Shuo Cheng
 Fix For: 1.14.0


`RuntimeExecutionMode` is used when creating a `StreamGraphGenerator` inside 
`StreamExecutionEnvironment`, however, when we want to create a 
`StreamGraphGenerator` outside `StreamExecutionEnvironment` (like 
ExecutorUtils#generateStreamGraph),  `getRuntimeExecutionMode` is needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22586) Improve precision derivation for decimal arithmetics

2021-05-06 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-22586:
--

 Summary: Improve precision derivation for decimal arithmetics
 Key: FLINK-22586
 URL: https://issues.apache.org/jira/browse/FLINK-22586
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.13.0
Reporter: Shuo Cheng
 Fix For: 1.14.0


Currently the precision and scale derivation is not properly for decimal data 
arithmetics, e.g,

considering the following example:
{code:java}
select cast('10.1' as decimal(38, 19)) * cast('10.2' as decimal(38, 19)) from 
T{code}
the result is `null`, which may confuses use a lot, because the result is far 
from overflow actually. 

The root cause is the precision derivation for the above multiplication is:

(38, 19) * (38, 19) -> (38, 38)

So there is no space for integral digits, which leads to null results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22304) Refactor some interfaces for TVF based window to improve the scalability

2021-04-16 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-22304:
--

 Summary: Refactor some interfaces for TVF based window to improve 
the scalability
 Key: FLINK-22304
 URL: https://issues.apache.org/jira/browse/FLINK-22304
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.13.0
Reporter: Shuo Cheng
 Fix For: 1.13.1


Refactoring  `WindowBuffer` and `WindowCombineFunction` to make the 
implementation more scalable



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22148) Planner rules should use RexCall#equsls to check whether two rexCalls are equivalent

2021-04-08 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-22148:
--

 Summary: Planner rules should use RexCall#equsls to check whether 
two rexCalls are equivalent
 Key: FLINK-22148
 URL: https://issues.apache.org/jira/browse/FLINK-22148
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.2
Reporter: Shuo Cheng
 Fix For: 1.13.0


Reproduce the bug by add the following test to `SemiAntiJoinTest`

 
{code:java}
// code placeholder
@Test
def testNotSimplifyJoinConditionWithSameDigest(): Unit = {
  val sqlQuery =
  """
|SELECT a
|FROM l
|WHERE c NOT IN (
|SELECT f FROM r WHERE f = c)
|""".stripMargin
  util.verifyRelPlan(sqlQuery)
}
{code}
 

CannotPlanException will be thrown, this is because Calcite planner will 
normalize a RexCall in the `equals` method (from 1.24), while in Flink planer 
rules, we still use toString to check two RexCalls are equivalent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22063) Lookup Join outputs wrong results for some scenario

2021-03-31 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-22063:
--

 Summary: Lookup Join outputs wrong results for some scenario
 Key: FLINK-22063
 URL: https://issues.apache.org/jira/browse/FLINK-22063
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.12.2
Reporter: Shuo Cheng


Reproduce the bug as following:

In LookupJoinITCase, given the sql 

{code:sql}
SELECT 
T.id, T.len, D.id, T.content, D.name 
FROM src AS T JOIN user_table for system_time as of T.proctime AS D 
ON T.id = D.id and cast(T.len as bigint) = D.id
{code}

the following execution plan is generated:

{code:java}
LegacySink(name=[DataStreamTableSink], fields=[id, len, id0, content, name])
+- Calc(select=[id, len, id0, content, name])
   +- LookupJoin(table=[**], joinType=[InnerJoin], async=[false], 
lookup=[id=len0], select=[id, len, content, len0, id, name])
  +- Calc(select=[id, len, content, CAST(len) AS len0])
 +- TableSourceScan(table=[[**]], fields=[id, len, content])
{code}

As we can see, the condition `T.id = D.id` is lost, so a wrong result may be 
produced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21613) Parse Compute Column with `IN` expression throws NPE

2021-03-04 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-21613:
--

 Summary: Parse Compute Column with `IN` expression throws NPE
 Key: FLINK-21613
 URL: https://issues.apache.org/jira/browse/FLINK-21613
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.13.0
Reporter: Shuo Cheng


Considering the following given sql:

{code:sql}
CREATE TABLE MyInputFormatTable (
  `a` INT,
  `b` BIGINT,
  `c` STRING,
  `d` as `c` IN ('Hi', 'Hello')
) WITH (
  'connector' = 'values',
  'data-id' = '$dataId',
  'runtime-source' = 'InputFormat'
)
{code}

NPE will be thrown during parsing the sql: 
`select * from MyInputFormatTable`

It seems it's the commit "[hotfix][table-planner-blink] Simplify SQL expression 
to RexNode conversion" which introduces this problem. This hotfix uses a method 
`SqlToRelConverter#convertExpression` and this method does not has any tests 
and is not used in Calcite anywhere, which is unsafe. Maybe reverting the 
hotfix is a good choice.

CC [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20826) Fix bug in streaming SQL examples

2020-12-31 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-20826:
--

 Summary: Fix bug in streaming SQL examples
 Key: FLINK-20826
 URL: https://issues.apache.org/jira/browse/FLINK-20826
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: Shuo Cheng
 Fix For: 1.13.0


There is some minor bug in `UpdatingTopCityExample`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20670) Support query hints for Flink SQL

2020-12-17 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-20670:
--

 Summary: Support query hints for Flink SQL
 Key: FLINK-20670
 URL: https://issues.apache.org/jira/browse/FLINK-20670
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Shuo Cheng
 Fix For: 1.13.0


Now Flink has supported dynamic table options based on the Hint framework of 
Calcite. 
Besides this, Hint syntax is also very useful for query optimization, as there 
is no perfect planner, and query hints provide a mechanism to direct the 
optimizer to choose an efficient query execution plan based on the specific 
criteria.

Currently, almost most popular databases and big-data frameworks support query 
hint, for example, Join hints in Oracle are used as following: 

{code:sql}
SELECT /*+ USE_MERGE(employees departments) */ *
FROM employees, departments 
WHERE employees.department_id = departments.department_id; 
{code}

Flink also have several join strategy for batch job, i.e., Nested-Loop, 
Sort-Merge and Hash Join, it will be convenient for users to produce an 
efficient join execution plan if Flink supports Join hint. Besides Join in 
batch job, it's also possible to use join hints to support partitioned
temporal table join in future. In a word, query hint, especially join hint now, 
will benefit Flink users a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17651) DecomposeGroupingSetsRule generates wrong plan when there exist distinct agg and simple agg with same filter

2020-05-13 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-17651:
--

 Summary: DecomposeGroupingSetsRule generates wrong plan when there 
exist distinct agg and simple agg with same filter
 Key: FLINK-17651
 URL: https://issues.apache.org/jira/browse/FLINK-17651
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.1
Reporter: Shuo Cheng
 Fix For: 1.11.0


Consider adding the following test case to 
org.apache.flink.table.planner.runtime.batch.sql.agg.AggregateITCaseBase. As 
you can see, the actual result is wrong.

 
{code:java}
@Test
def testSimpleAndDistinctAggWithCommonFilter(): Unit = {
  val sql =
"""
  |SELECT
  |   h,
  |   COUNT(1) FILTER(WHERE d > 1),
  |   COUNT(1) FILTER(WHERE d < 2),
  |   COUNT(DISTINCT e) FILTER(WHERE d > 1)
  |FROM Table5
  |GROUP BY h
  |""".stripMargin
  checkResult(
sql,
Seq(
  row(1,4,1,4),
  row(2,7,0,7),
  row(3,3,0,3)
)
  )
}

Results
 == Correct Result ==   == Actual Result ==
 1,4,1,41,0,1,4
 2,7,0,72,0,0,7
 3,3,0,33,0,0,3
{code}
The problem lies in `DecomposeGroupingSetsRule`, which omits filter arg of 
aggregate call when doing some processing.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17649) Generated hash aggregate code may produce NPE when there exists an aggregate call with Filter.

2020-05-13 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-17649:
--

 Summary: Generated hash aggregate code may produce NPE when there 
exists an aggregate call with Filter.
 Key: FLINK-17649
 URL: https://issues.apache.org/jira/browse/FLINK-17649
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.1
Reporter: Shuo Cheng
 Fix For: 1.11.0


When generating code for Filter predicate of aggregate call in 
`HashAggCodeGenHelper`, we should check the nullability of the boolean field 
firstly rather than `getBoolean` directly, otherwise, NPE may be produced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16578) join lateral table function with condition fails with exception

2020-03-12 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-16578:
--

 Summary: join lateral table function with condition fails with 
exception
 Key: FLINK-16578
 URL: https://issues.apache.org/jira/browse/FLINK-16578
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Shuo Cheng


Reproducing:

 
{code:java}
// CorrelateITCase.scala
@Test
def testJoinTableFunction(): Unit = {
  registerFunction("func", new TableFunc2)
  val sql =
"""
  | select
  |   c, s, l
  | from inputT JOIN LATERAL TABLE(func(c)) as T(s, l)
  | on s = c
  |""".stripMargin
  checkResult(sql, Seq())
}
{code}
The it case will be failed with exception: "Cannot generate a valid execution 
plan for the given query".

 Firstly, for the given sql, the logical plan produced by SqlToRelConvert is 
already wrong, which is a  bug introduced by CALCITE-2004, and fixed in 
CALCITE-3847 (fixed versions 1.23).

Secondly, even after the fix, we may fail in 
`FlinkCorrelateVariablesValidationProgram`, because after decorrelating, there 
exists correlate variable in a `LogicalFilter`. we should fix the validation 
problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16577) Exception will be thrown when computing columnInterval relmetadata in some case

2020-03-12 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-16577:
--

 Summary: Exception will be thrown when computing columnInterval 
relmetadata in some case
 Key: FLINK-16577
 URL: https://issues.apache.org/jira/browse/FLINK-16577
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Shuo Cheng
 Fix For: 1.11.0
 Attachments: image-2020-03-13-10-32-35-375.png

Consider the following SQL

 
{code:java}
// a: INT, c: LONG
SELECT 
c, SUM(a) 
FROM T 
WHERE a > 0.1 AND a < 1 
GROUP BY c{code}
 

Here the sql type of 0.1 is Decimal and 1 is Integer, and they are both in 
NUMERIC type family, and do not trigger type coercion, so the plan is:
{code:java}
FlinkLogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+- FlinkLogicalCalc(select=[c, a], where=[AND(>(a, 0.1:DECIMAL(2, 1)), <(a, 
1))])
   +- FlinkLogicalTableSourceScan(table=[[...]], fields=[a, b, c])
{code}
When we calculate the filtered column interval of calc, it'll lead to 
validation exception of `FiniteValueInterval`:

!image-2020-03-13-10-32-35-375.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15537) Type of keys should be `BinaryRow` when manipulating map state with `BaseRow` as key type.

2020-01-09 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-15537:
--

 Summary: Type of keys should be `BinaryRow` when manipulating map 
state with `BaseRow` as key type.
 Key: FLINK-15537
 URL: https://issues.apache.org/jira/browse/FLINK-15537
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1
Reporter: Shuo Cheng
 Fix For: 1.11.0


`BaseRow` is serialized and deserialized as `BinaryRow` by default, so when the 
key type of the map state is `BaseRow`, we should construct map keys with 
`BinaryRow` as type to get value from map state, otherwise, you would  always 
get Null...

Try it with following SQL:
{code:java}
// (b: Int, c: String)
SELECT 
  b, listagg(DISTINCT c, '#')
FROM MyTable
GROUP BY b
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15466) `FlinkAggregateExpandDistinctAggregatesRule` generates wrong plan for cases that have distinct aggs with filter.

2020-01-02 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-15466:
--

 Summary: `FlinkAggregateExpandDistinctAggregatesRule` generates 
wrong plan for cases that have distinct aggs with filter.
 Key: FLINK-15466
 URL: https://issues.apache.org/jira/browse/FLINK-15466
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0, 1.8.3
Reporter: Shuo Cheng
 Fix For: 1.10.0
 Attachments: image-2020-01-03-14-20-54-887.png

For the sql in batch mode,

 
{code:java}
SELECT 
   a, COUNT(a), SUM(DISTINCT b) FILTER (WHERE a > 0)
FROM MyTable 
GROUP BY a{code}
 

plan generated after logical stage is as following, which is not correct. The 
`Filter $4` should be `$f2 *and* $g_0`.

!image-2020-01-03-14-20-54-887.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)