[jira] [Commented] (FLINK-34633) Support unnesting array constants

2024-06-08 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853436#comment-17853436
 ] 

Jeyhun Karimov commented on FLINK-34633:


Hi [~fanrui] 

Yes, considering that patch versions should not include new features, then this 
PR should not be part of 1.19.1 release. 

IMO, fix version should be updated to 1.20.0. Do you agree [~qingyue] ?

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34633) Support unnesting array constants

2024-06-08 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853436#comment-17853436
 ] 

Jeyhun Karimov edited comment on FLINK-34633 at 6/8/24 9:35 PM:


Hi [~fanrui] 

Very good point. Yes, considering that patch versions should not include new 
features, then this PR should not be part of 1.19.1 release. 

IMO, fix version should be updated to 1.20.0. Do you agree [~qingyue] ?


was (Author: jeyhunkarimov):
Hi [~fanrui] 

Yes, considering that patch versions should not include new features, then this 
PR should not be part of 1.19.1 release. 

IMO, fix version should be updated to 1.20.0. Do you agree [~qingyue] ?

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35417) JobManager and TaskManager support merging and run in a single process

2024-05-22 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848618#comment-17848618
 ] 

Jeyhun Karimov commented on FLINK-35417:


Hi [~melin], I echo the comments above. In case, you are not planning to work 
on this, I can spend some cycles to drive this issue.

> JobManager and TaskManager support merging and run in a single process
> --
>
> Key: FLINK-35417
> URL: https://issues.apache.org/jira/browse/FLINK-35417
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: melin
>Priority: Major
>
> flink is widely used in data integration scenarios, where a single 
> concurrency is not high, and in many cases a single concurrency can run a 
> task. Consider the high availability, application mode, and large number of 
> JobManger nodes that cost a lot of resources. If the Session mode is used, 
> the stability is not high.
> In application mode, JobManager and TaskManager can be run together to 
> achieve reliability and save resources.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846759#comment-17846759
 ] 

Jeyhun Karimov commented on FLINK-35289:


 
{code:java}
carData.assignTimestampsAndWatermarks(
WatermarkStrategy
.>
forMonotonousTimestamps()
.withTimestampAssigner((car, ts) -> car.f0))
.keyBy(value -> value.f0)
.process(new KeyedProcessFunction, Tuple4>() {
@Override
public void processElement(
Tuple4 value,
KeyedProcessFunction, Tuple4>.Context ctx,
Collector> out) 
throws Exception {


ctx.timerService().registerProcessingTimeTimer(Long.MAX_VALUE);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector> out) throws Exception {
out.collect(new Tuple4<>(1,1,1.0,1L));
}
})
.process(new ProcessFunction, 
Tuple4>() {
@Override
public void processElement(
Tuple4 value,
ProcessFunction, 
Tuple4>.Context ctx,
Collector> out) 
throws Exception {
LOG(ctx.timestamp());
}
}){code}
 

If this is the example you mean, then the second process function does not 
output timestamp of Long.MAX_VALUE. 

 

If you have sth else in mind, then please provide minimum reproducible example. 
Thanks

 

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846631#comment-17846631
 ] 

Jeyhun Karimov commented on FLINK-35289:


Hi [~ekanthi] could you please provide a concrete example? According to your 
explanation, if

 
{code:java}
ctx.timerService().registerProcessingTimeTimer(Long.MAX_VALUE); {code}
and
{code:java}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception {
out.collect(111);
} {code}
then, collected value (111) will have Long.MAX_VALUE timestamp. 

However, when we check the implementation of TimestampedCollector#collect, we 
can see that this is not the case. The stream record will preserve its existing 
timestamp. 

 

Am I missing something?

 

Thanks!

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-34379) table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError

2024-05-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reopened FLINK-34379:


Reopening because of patch needed, as a result of the comment: 
https://github.com/apache/flink/pull/24600#discussion_r1600843684

> table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError
> --
>
> Key: FLINK-34379
> URL: https://issues.apache.org/jira/browse/FLINK-34379
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.2, 1.18.1
> Environment: 1.17.1
>Reporter: zhu
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.3, 1.18.2, 1.20.0, 1.19.1
>
>
> When using batch computing, I union all about 50 tables and then join other 
> table. When compiling the execution plan, 
> there throws OutOfMemoryError: Java heap space, which was no problem in  
> 1.15.2. However, both 1.17.2 and 1.18.1 all throws same errors,This causes 
> jobmanager to restart. Currently,it has been found that this is caused by 
> table.optimizer.dynamic-filtering.enabled, which defaults is true,When I set 
> table.optimizer.dynamic-filtering.enabled to false, it can be compiled and 
> executed normally
> code
> TableEnvironment.create(EnvironmentSettings.newInstance()
> .withConfiguration(configuration)
> .inBatchMode().build())
> sql=select att,filename,'table0' as mo_name from table0 UNION All select 
> att,filename,'table1' as mo_name from table1 UNION All select 
> att,filename,'table2' as mo_name from table2 UNION All select 
> att,filename,'table3' as mo_name from table3 UNION All select 
> att,filename,'table4' as mo_name from table4 UNION All select 
> att,filename,'table5' as mo_name from table5 UNION All select 
> att,filename,'table6' as mo_name from table6 UNION All select 
> att,filename,'table7' as mo_name from table7 UNION All select 
> att,filename,'table8' as mo_name from table8 UNION All select 
> att,filename,'table9' as mo_name from table9 UNION All select 
> att,filename,'table10' as mo_name from table10 UNION All select 
> att,filename,'table11' as mo_name from table11 UNION All select 
> att,filename,'table12' as mo_name from table12 UNION All select 
> att,filename,'table13' as mo_name from table13 UNION All select 
> att,filename,'table14' as mo_name from table14 UNION All select 
> att,filename,'table15' as mo_name from table15 UNION All select 
> att,filename,'table16' as mo_name from table16 UNION All select 
> att,filename,'table17' as mo_name from table17 UNION All select 
> att,filename,'table18' as mo_name from table18 UNION All select 
> att,filename,'table19' as mo_name from table19 UNION All select 
> att,filename,'table20' as mo_name from table20 UNION All select 
> att,filename,'table21' as mo_name from table21 UNION All select 
> att,filename,'table22' as mo_name from table22 UNION All select 
> att,filename,'table23' as mo_name from table23 UNION All select 
> att,filename,'table24' as mo_name from table24 UNION All select 
> att,filename,'table25' as mo_name from table25 UNION All select 
> att,filename,'table26' as mo_name from table26 UNION All select 
> att,filename,'table27' as mo_name from table27 UNION All select 
> att,filename,'table28' as mo_name from table28 UNION All select 
> att,filename,'table29' as mo_name from table29 UNION All select 
> att,filename,'table30' as mo_name from table30 UNION All select 
> att,filename,'table31' as mo_name from table31 UNION All select 
> att,filename,'table32' as mo_name from table32 UNION All select 
> att,filename,'table33' as mo_name from table33 UNION All select 
> att,filename,'table34' as mo_name from table34 UNION All select 
> att,filename,'table35' as mo_name from table35 UNION All select 
> att,filename,'table36' as mo_name from table36 UNION All select 
> att,filename,'table37' as mo_name from table37 UNION All select 
> att,filename,'table38' as mo_name from table38 UNION All select 
> att,filename,'table39' as mo_name from table39 UNION All select 
> att,filename,'table40' as mo_name from table40 UNION All select 
> att,filename,'table41' as mo_name from table41 UNION All select 
> att,filename,'table42' as mo_name from table42 UNION All select 
> att,filename,'table43' as mo_name from table43 UNION All select 
> att,filename,'table44' as mo_name from table44 UNION All select 
> att,filename,'table45' as mo_name from table45 UNION All select 
> att,filename,'table46' as mo_name from table46 UNION All select 
> att,filename,'table47' as mo_name from table47 UNION All select 
> att,filename,'table48' as mo_name from table48 UNION All select 
> att,filename,'table49' as mo_name from table49 UNION All select 
> att,filename,'table50' as mo_name from 

[jira] [Commented] (FLINK-34902) INSERT INTO column mismatch leads to IndexOutOfBoundsException

2024-04-25 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840752#comment-17840752
 ] 

Jeyhun Karimov commented on FLINK-34902:


Hi [~twalthr] sure, I will check the issue

> INSERT INTO column mismatch leads to IndexOutOfBoundsException
> --
>
> Key: FLINK-34902
> URL: https://issues.apache.org/jira/browse/FLINK-34902
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> SQL:
> {code}
> INSERT INTO t (a, b) SELECT 1;
> {code}
>  
> Stack trace:
> {code}
> org.apache.flink.table.api.ValidationException: SQL validation failed. Index 
> 1 out of bounds for length 1
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>     at
> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
> length 1
>     at 
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>     at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>     at 
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>     at java.base/java.util.Objects.checkIndex(Objects.java:374)
>     at java.base/java.util.ArrayList.get(ArrayList.java:459)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1(PreValidateReWriter.scala:355)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1$adapted(PreValidateReWriter.scala:355)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35098) Incorrect results for queries like "10 >= y" on tables using Filesystem connector and Orc format

2024-04-13 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836868#comment-17836868
 ] 

Jeyhun Karimov commented on FLINK-35098:


Hi [~empathy87] sorry, I have been working on the issue in the time between you 
created the issue and you commented here, so I did not track your comment here. 
In any case, submission might have some issues (now closed), so you are welcome 
to submit your PR. Thanks

> Incorrect results for queries like "10 >= y" on tables using Filesystem 
> connector and Orc format
> 
>
> Key: FLINK-35098
> URL: https://issues.apache.org/jira/browse/FLINK-35098
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ORC, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.19.0, 
> 1.18.1
>Reporter: Andrey Gaskov
>Priority: Major
>  Labels: pull-request-available
>
> When working with ORC files, there is an issue with evaluation of SQL queries 
> containing expressions with a literal as the first operand. Specifically, the 
> query *10 >= y* does not always return the correct result.
> This test added to OrcFileSystemITCase.java fails on the second check:
>  
> {code:java}
> @TestTemplate
> void testOrcFilterPushDownLiteralFirst() throws ExecutionException, 
> InterruptedException {
> super.tableEnv()
> .executeSql("insert into orcLimitTable values('a', 10, 10)")
> .await();
> List expected = Collections.singletonList(Row.of(10));
> check("select y from orcLimitTable where y <= 10", expected);
> check("select y from orcLimitTable where 10 >= y", expected);
> }
> Results do not match for query:
>   select y from orcLimitTable where 10 >= y
> Results
>  == Correct Result - 1 ==   == Actual Result - 0 ==
> !+I[10]    {code}
> The checks are equivalent and should evaluate to the same result. But the 
> second query doesn't return the record with y=10.
> The table is defined as:
> {code:java}
> create table orcLimitTable (
> x string,
> y int,
> a int) 
> with (
> 'connector' = 'filesystem',
> 'path' = '/tmp/junit4374176500101507155/junit7109291529844202275/',
> 'format'='orc'){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31163) Unexpected correlate variable $cor0 in the plan error in where clause

2024-04-10 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835878#comment-17835878
 ] 

Jeyhun Karimov commented on FLINK-31163:


When I tried to reproduce the issue (as of 
3590c2d86f4186771ffcd64712f756d31306eb88), the given query in the issue 
executes and exits without any exceptions.

> Unexpected correlate variable $cor0 in the plan error in where clause
> -
>
> Key: FLINK-31163
> URL: https://issues.apache.org/jira/browse/FLINK-31163
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.16.0
>Reporter: P Rohan Kumar
>Priority: Major
>
> {code:java}
> val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val accountsTd = 
> TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
> .newBuilder()
> .column("account_num", DataTypes.VARCHAR(2147483647))
> .column("acc_name", DataTypes.VARCHAR(2147483647))
> .column("acc_phone_num", DataTypes.VARCHAR(2147483647))
> .build())
>   .build()
> val accountsTable = tableEnv.from(accountsTd)
> tableEnv.createTemporaryView("accounts", accountsTable)
> val transactionsTd = 
> TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
> .newBuilder()
> .column("account_num", DataTypes.VARCHAR(2147483647))
> .column("transaction_place", DataTypes.VARCHAR(2147483647))
> .column("transaction_time", DataTypes.BIGINT())
> .column("amount", DataTypes.INT())
> .build())
>   .build()
> val transactionsTable = tableEnv.from(transactionsTd)
> tableEnv.createTemporaryView("transaction_data", transactionsTable)
> val newTable = tableEnv.sqlQuery("select   acc.account_num,  (select count(*) 
> from transaction_data where transaction_place = trans.transaction_place and 
> account_num = acc.account_num)  from  accounts acc,transaction_data trans")
> tableEnv.toChangelogStream(newTable).print()
> env.execute() {code}
> I get the following error if I run the above code.
>  
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> unexpected correlate variable $cor0 in the plan
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:59)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.immutable.Range.foreach(Range.scala:158)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-04-04 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833919#comment-17833919
 ] 

Jeyhun Karimov commented on FLINK-32513:


Hi [~martijnvisser]  Thanks for reporting it. Would it be possible to fix it in 
kafka connector side, to make it compatible with releases 1.18 and 1.19?

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Updated] (FLINK-34924) Support partition pushdown for join queries

2024-03-23 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34924:
---
Description: 
Consider the following tables: 
{code:java}
create table partitionedTable1 (
   a int, 
   b int, 
   c int)  
partitioned by (a, b) 
with ( ... ){code}
 
{code:java}
create table partitionedTable2 (
c int, 
d int, 
e int)  
 partitioned by (d, e) 
 with ( ... )  {code}
 

And the following queries:
{code:java}
select t1.b 
from partitionedTable1 t1 inner join partitionedTable2 t2 
on t1.a = t2.d 
where t1.a > 1

or 

select t1.b  from partitionedTable1 t1 inner join  partitionedTable2 t2 
on t1.a = t2.d and t1.b = t2.e 
where t1.a > 1{code}
 

For the above-mentioned queries, currently, the partition pushdown rules in 
Flink only consider the filter clause (t1.a > 1) and pushe the related 
partitions to the source operator. 

However, we should be able to also pushdown partitions because of join clause. 
Note that in the above-mentioned queries partitioned columns are the same as 
join fields (or prefix-subset of them). So, we can fetch existing partitions 
from each table, intersect them, and push their intersection to their source 
operators. 

  was:
Consider the following tables: 
{code:java}
create table partitionedTable1 (
   a int, 
   b int, 
   c int)  
partitioned by (a, b) 
with ( ... ){code}
 
{code:java}
create table partitionedTable2 (
c int, 
d int, 
e int)  
 partitioned by (d, e) 
 with ( ... )  {code}
 

And the following query:
{code:java}
select t1.b 
from partitionedTable1 t1 inner join partitionedTable2 t2 
on t1.a = t2.d 
where t1.a > 1{code}
 

Currently, the partition pushdown only considers the filter clause (t1.a > 1) 
and pushes the related partitions to the source operator. 

However, we should be able to also pushdown partitions because of join clause. 
Note that partitioned columns are the same as join fields. So, we can fetch 
existing partitions from each table, intersect them, and push their 
intersection to their source operators. 


> Support partition pushdown for join queries
> ---
>
> Key: FLINK-34924
> URL: https://issues.apache.org/jira/browse/FLINK-34924
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jeyhun Karimov
>Priority: Major
>
> Consider the following tables: 
> {code:java}
> create table partitionedTable1 (
>a int, 
>b int, 
>c int)  
> partitioned by (a, b) 
> with ( ... ){code}
>  
> {code:java}
> create table partitionedTable2 (
> c int, 
> d int, 
> e int)  
>  partitioned by (d, e) 
>  with ( ... )  {code}
>  
> And the following queries:
> {code:java}
> select t1.b 
> from partitionedTable1 t1 inner join partitionedTable2 t2 
> on t1.a = t2.d 
> where t1.a > 1
> or 
> select t1.b  from partitionedTable1 t1 inner join  partitionedTable2 t2 
> on t1.a = t2.d and t1.b = t2.e 
> where t1.a > 1{code}
>  
> For the above-mentioned queries, currently, the partition pushdown rules in 
> Flink only consider the filter clause (t1.a > 1) and pushe the related 
> partitions to the source operator. 
> However, we should be able to also pushdown partitions because of join 
> clause. Note that in the above-mentioned queries partitioned columns are the 
> same as join fields (or prefix-subset of them). So, we can fetch existing 
> partitions from each table, intersect them, and push their intersection to 
> their source operators. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34924) Support partition pushdown for join queries

2024-03-23 Thread Jeyhun Karimov (Jira)
Jeyhun Karimov created FLINK-34924:
--

 Summary: Support partition pushdown for join queries
 Key: FLINK-34924
 URL: https://issues.apache.org/jira/browse/FLINK-34924
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Jeyhun Karimov


Consider the following tables: 
{code:java}
create table partitionedTable1 (
   a int, 
   b int, 
   c int)  
partitioned by (a, b) 
with ( ... ){code}
 
{code:java}
create table partitionedTable2 (
c int, 
d int, 
e int)  
 partitioned by (d, e) 
 with ( ... )  {code}
 

And the following query:
{code:java}
select t1.b 
from partitionedTable1 t1 inner join partitionedTable2 t2 
on t1.a = t2.d 
where t1.a > 1{code}
 

Currently, the partition pushdown only considers the filter clause (t1.a > 1) 
and pushes the related partitions to the source operator. 

However, we should be able to also pushdown partitions because of join clause. 
Note that partitioned columns are the same as join fields. So, we can fetch 
existing partitions from each table, intersect them, and push their 
intersection to their source operators. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32367) lead function second param cause ClassCastException

2024-03-10 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825031#comment-17825031
 ] 

Jeyhun Karimov commented on FLINK-32367:


Hi [~zhou_yb] when I tried to reproduce with the current master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54), the LEAD function successfully 
accepts and executes expression as parameter. For example, the following query 
works:
{code:java}
SELECT a, b, lead(b, a/2, 3) over (partition by a order by b), lag(b, 1, 3) 
over (partitionby a order by b) FROM Table6{code}
Could you please verify or am I missing sth?

> lead function second param cause ClassCastException
> ---
>
> Key: FLINK-32367
> URL: https://issues.apache.org/jira/browse/FLINK-32367
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: zhou
>Priority: Major
> Attachments: image-2023-06-16-15-49-49-003.png, 
> image-2023-06-16-18-12-05-861.png
>
>
> !image-2023-06-16-18-12-05-861.png!!image-2023-06-16-15-49-49-003.png!
> lead function second param is expression (window_length/2),throw a exception 
> if lead function second param is number,it worked well



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33579) Join sql error

2024-03-10 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825027#comment-17825027
 ] 

Jeyhun Karimov commented on FLINK-33579:


Hi [~waywtdcc] when I tried to reproduce locally as of the current master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54), the above-mentioned exception seems 
to be gone. 

Could you please verify?

> Join sql error
> --
>
> Key: FLINK-33579
> URL: https://issues.apache.org/jira/browse/FLINK-33579
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: waywtdcc
>Priority: Major
>
>  
> {code:java}
> set pipeline.operator-chaining=true;
>  set execution.runtime-mode=BATCH;
>   set table.exec.disabled-operators = NestedLoopJoin;
> explain plan for
> select
> *
> from
> orders,
> supplier,
> customer
> where
> c_custkey = o_custkey and
> c_nationkey = s_nationkey {code}
>  
>  
>  
> error:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
>  
> FlinkLogicalJoin(condition=[AND(=($21, $2), =($24, $15))], joinType=[inner])
> :- FlinkLogicalJoin(condition=[true], joinType=[inner])
> :  :- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, orders]], 
> fields=[uuid, o_orderkey, o_custkey, o_orderstatus, o_totalprice, 
> o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, ts])
> :  +- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, 
> supplier]], fields=[uuid, s_suppkey, s_name, s_address, s_nationkey, s_phone, 
> s_acctbal, s_comment, ts])
> +- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, customer]], 
> fields=[uuid, c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, 
> c_mktsegment, c_comment, ts])
>  
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>  
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:541)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:115)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:47)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:620)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentInternal.explainInternal(TableEnvironmentInternal.java:96)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1296)
> at 
> 

[jira] [Commented] (FLINK-32679) Filter conditions cannot be pushed to JOIN in some case

2024-03-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824995#comment-17824995
 ] 

Jeyhun Karimov commented on FLINK-32679:


Hi [~grandfisher] the mentioned query does pushes the join conditions down the 
join operator (I verified below with the current master - 
d6a4eb966fbc47277e07b79e7c64939a62eb1d54). Or am I missing something? 

 
{code:java}
Calc(select=[CAST(0 AS INTEGER) AS id, b, CAST(0 AS INTEGER) AS id0, b0, CAST(0 
AS INTEGER) AS id1, b1, CAST(0 AS INTEGER) AS id2, b2, CAST(0 AS INTEGER) AS 
id3, b3])
+- Join(joinType=[InnerJoin], where=[true], select=[b, b0, b1, b2, b3], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[single])
   :  +- Join(joinType=[InnerJoin], where=[true], select=[b, b0, b1, b2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :     :- Exchange(distribution=[single])
   :     :  +- Join(joinType=[InnerJoin], where=[true], select=[b, b0, b1], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :     :     :- Exchange(distribution=[single])
   :     :     :  +- Join(joinType=[InnerJoin], where=[true], select=[b, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :     :     :     :- Exchange(distribution=[single])
   :     :     :     :  +- Calc(select=[b], where=[(id = 0)])
   :     :     :     :     +- TableSourceScan(table=[[default_catalog, 
default_database, v1]], fields=[id, b])
   :     :     :     +- Exchange(distribution=[single])
   :     :     :        +- Calc(select=[b], where=[(id = 0)])
   :     :     :           +- TableSourceScan(table=[[default_catalog, 
default_database, v2]], fields=[id, b])
   :     :     +- Exchange(distribution=[single])
   :     :        +- Calc(select=[b], where=[(0 = id)])
   :     :           +- TableSourceScan(table=[[default_catalog, 
default_database, v3]], fields=[id, b])
   :     +- Exchange(distribution=[single])
   :        +- Calc(select=[b], where=[(0 = id)])
   :           +- TableSourceScan(table=[[default_catalog, default_database, 
v4]], fields=[id, b])
   +- Exchange(distribution=[single])
      +- Calc(select=[b], where=[(0 = id)])
         +- TableSourceScan(table=[[default_catalog, default_database, v5]], 
fields=[id, b]){code}

> Filter conditions cannot be pushed to JOIN in some case
> ---
>
> Key: FLINK-32679
> URL: https://issues.apache.org/jira/browse/FLINK-32679
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: grandfisher
>Priority: Major
>
> There is a case
> {code:java}
> SELECT a.id, b.id, c.id, d.id, e.id
>   , f.id
> FROM `table-v1` a
>   INNER JOIN `table-v2` b ON a.id = b.id
>   INNER JOIN `table-v3` c ON b.id = c.id
>   INNER JOIN `table-v4` d ON c.id = d.id
>   INNER JOIN `table-v5` e ON d.id = e.id
>   INNER JOIN `table-v6` f ON a.id = f.id
> WHERE f.id = 0
> {code}
> In this sql, each table should have a condition {*}id=0{*}, but actually only 
> table *f* and *a* has this condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32998) if function result not correct

2024-03-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824988#comment-17824988
 ] 

Jeyhun Karimov edited comment on FLINK-32998 at 3/9/24 10:25 PM:
-

[~martijnvisser] [~zhou_yb]  I verified that as of master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54) the issue seems to be fixed. 


was (Author: jeyhunkarimov):
[~martijnvisser] I verified that as of master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54) the issue seems to be fixed. 

> if function result not correct
> --
>
> Key: FLINK-32998
> URL: https://issues.apache.org/jira/browse/FLINK-32998
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.4
>Reporter: zhou
>Priority: Major
> Attachments: image-2023-08-30-18-29-16-277.png, 
> image-2023-08-30-18-30-05-568.png
>
>
> *if function result not correct,not result in origin field value, cut off the 
> filed(word) value* 
> code :
> !image-2023-08-30-18-29-16-277.png!
> result:
> !image-2023-08-30-18-30-05-568.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32998) if function result not correct

2024-03-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824988#comment-17824988
 ] 

Jeyhun Karimov commented on FLINK-32998:


[~martijnvisser] I verified that as of master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54) the issue seems to be fixed. 

> if function result not correct
> --
>
> Key: FLINK-32998
> URL: https://issues.apache.org/jira/browse/FLINK-32998
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.4
>Reporter: zhou
>Priority: Major
> Attachments: image-2023-08-30-18-29-16-277.png, 
> image-2023-08-30-18-30-05-568.png
>
>
> *if function result not correct,not result in origin field value, cut off the 
> filed(word) value* 
> code :
> !image-2023-08-30-18-29-16-277.png!
> result:
> !image-2023-08-30-18-30-05-568.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result

2024-03-01 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822456#comment-17822456
 ] 

Jeyhun Karimov commented on FLINK-34535:


Hi [~heigebupahei] +1 for FLIP, since the issue is mainly touches/adds 
user-relevant changes.

I still dont fully get the motivation behind the setting parallelism, maybe 
check [this PR|https://github.com/apache/flink/pull/22818] and then rewrite 
your motivation accordingly?

 

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded

2024-02-29 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822311#comment-17822311
 ] 

Jeyhun Karimov commented on FLINK-29370:


Hi [~tanjialiang] you might need to consider [this 
comment|https://github.com/apache/flink/pull/14376#issuecomment-1164395312] 
before relocating protobuf

> Protobuf in flink-sql-protobuf is not shaded
> 
>
> Key: FLINK-29370
> URL: https://issues.apache.org/jira/browse/FLINK-29370
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Jark Wu
>Priority: Major
>
> The protobuf classes in flink-sql-protobuf is not shaded which may lead to 
> class conflicts. Usually, sql jars should shade common used dependencies, 
> e.g. flink-sql-avro: 
> https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88
>  
> {code}
> ➜  Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/
>568 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/ProtobufInternalUtils.class
>  19218 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$Builder.class
>259 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$BuilderParent.class
>  10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class
>   1486 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class
>  12399 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder.class
>279 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$InternalOneOfEnu
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34459) Results column names should match SELECT clause expression names

2024-02-19 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818489#comment-17818489
 ] 

Jeyhun Karimov commented on FLINK-34459:


Yes that makes sense. Sure thing

> Results column names should match SELECT clause expression names
> 
>
> Key: FLINK-34459
> URL: https://issues.apache.org/jira/browse/FLINK-34459
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.18.1
>Reporter: Jeyhun Karimov
>Priority: Minor
>
> When printing {{SQL SELECT}} results, Flink will output generated expression 
> name when the expression is not {{column reference or used with alias/over.}}
> For example, select a, a + 1 from T would result in 
> {code:java}
> ++-+-+
> | op |   a |  EXPR$1 |
> ++-+-+
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> ++-+-+
> {code}
> Instead of the generated {{EXPR$1}} it would be nice to have {{a + 1}} (which 
> is the case in some other data processing systems like Spark).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34459) Results column names should match SELECT clause expression names

2024-02-19 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818440#comment-17818440
 ] 

Jeyhun Karimov edited comment on FLINK-34459 at 2/19/24 10:15 AM:
--

Hi [~martijnvisser] yes, there are some tradeoffs. Using AS is always a 
solution, but then a user needs to modify a query [and maybe modify back]. 
And for queries with many projection expressions, user needs to remember the 
mapping between EXPR$X -> actual expression in the query. 

Some other systems like Spark does not truncate (at least for the large 
expressions I tried), some like MySQL/SQLite truncate after some point (they 
decide where and how to truncate for large expressions). 

So, we have multiple options to deal with the very large expressions: fallback 
to the current (EXPR$X) version, truncate, etc.
WDYT?



was (Author: jeyhunkarimov):
Hi [~martijnvisser] yes, there are some tradeoffs. Using AS is always a 
solution, but then a user needs to modify a query [and maybe modify back]. 
And for queries with many projection expressions, user needs to remember the 
mapping between EXPR$X -> actual expression in the query. 

Some other vendors like Spark does not truncate (at least for the large 
expressions I tried), some like MySQL/SQLite truncate after some point (they 
decide where and how to truncate for large expressions). 

So, we have multiple options to deal with the very large expressions: fallback 
to the current (EXPR$X) version, truncate, etc.
WDYT?


> Results column names should match SELECT clause expression names
> 
>
> Key: FLINK-34459
> URL: https://issues.apache.org/jira/browse/FLINK-34459
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.18.1
>Reporter: Jeyhun Karimov
>Priority: Minor
>
> When printing {{SQL SELECT}} results, Flink will output generated expression 
> name when the expression is not {{column reference or used with alias/over.}}
> For example, select a, a + 1 from T would result in 
> {code:java}
> ++-+-+
> | op |   a |  EXPR$1 |
> ++-+-+
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> ++-+-+
> {code}
> Instead of the generated {{EXPR$1}} it would be nice to have {{a + 1}} (which 
> is the case in some other data processing systems like Spark).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34459) Results column names should match SELECT clause expression names

2024-02-19 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818440#comment-17818440
 ] 

Jeyhun Karimov commented on FLINK-34459:


Hi [~martijnvisser] yes, there are some tradeoffs. Using AS is always a 
solution, but then a user needs to modify a query [and maybe modify back]. 
And for queries with many projection expressions, user needs to remember the 
mapping between EXPR$X -> actual expression in the query. 

Some other vendors like Spark does not truncate (at least for the large 
expressions I tried), some like MySQL/SQLite truncate after some point (they 
decide where and how to truncate for large expressions). 

So, we have multiple options to deal with the very large expressions: fallback 
to the current (EXPR$X) version, truncate, etc.
WDYT?


> Results column names should match SELECT clause expression names
> 
>
> Key: FLINK-34459
> URL: https://issues.apache.org/jira/browse/FLINK-34459
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.18.1
>Reporter: Jeyhun Karimov
>Priority: Minor
>
> When printing {{SQL SELECT}} results, Flink will output generated expression 
> name when the expression is not {{column reference or used with alias/over.}}
> For example, select a, a + 1 from T would result in 
> {code:java}
> ++-+-+
> | op |   a |  EXPR$1 |
> ++-+-+
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> ++-+-+
> {code}
> Instead of the generated {{EXPR$1}} it would be nice to have {{a + 1}} (which 
> is the case in some other data processing systems like Spark).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34459) Results column names should match SELECT clause expression names

2024-02-18 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34459:
---
Description: 
When printing {{SQL SELECT}} results, Flink will output generated expression 
name when the expression is not {{column reference or used with alias/over.}}
For example, select a, a + 1 from T would result in 


{code:java}
++-+-+
| op |   a |  EXPR$1 |
++-+-+
| +I |   1 |   2 |
| +I |   1 |   2 |
| +I |   1 |   2 |
++-+-+
{code}

Instead of the generated {{EXPR$1}} it would be nice to have {{a + 1}} (which 
is the case in some other data processing systems like Spark).


  was:
When printing {{SQL SELECT}} results, Flink will output generated expression 
name when the expression type is not {{column reference or alias or over.}}
For example, select a, a + 1 from T would result in 


{code:java}
++-+-+
| op |   a |  EXPR$1 |
++-+-+
| +I |   1 |   2 |
| +I |   1 |   2 |
| +I |   1 |   2 |
++-+-+
{code}

Instead of the generated {{EXPR$1}} it would be nice to have {{a + 1}} (which 
is the case in some other data processing systems like Spark).



> Results column names should match SELECT clause expression names
> 
>
> Key: FLINK-34459
> URL: https://issues.apache.org/jira/browse/FLINK-34459
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.18.1
>Reporter: Jeyhun Karimov
>Priority: Minor
>
> When printing {{SQL SELECT}} results, Flink will output generated expression 
> name when the expression is not {{column reference or used with alias/over.}}
> For example, select a, a + 1 from T would result in 
> {code:java}
> ++-+-+
> | op |   a |  EXPR$1 |
> ++-+-+
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> | +I |   1 |   2 |
> ++-+-+
> {code}
> Instead of the generated {{EXPR$1}} it would be nice to have {{a + 1}} (which 
> is the case in some other data processing systems like Spark).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34459) Results column names should match SELECT clause expression names

2024-02-18 Thread Jeyhun Karimov (Jira)
Jeyhun Karimov created FLINK-34459:
--

 Summary: Results column names should match SELECT clause 
expression names
 Key: FLINK-34459
 URL: https://issues.apache.org/jira/browse/FLINK-34459
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.18.1
Reporter: Jeyhun Karimov


When printing {{SQL SELECT}} results, Flink will output generated expression 
name when the expression type is not {{column reference or alias or over.}}
For example, select a, a + 1 from T would result in 


{code:java}
++-+-+
| op |   a |  EXPR$1 |
++-+-+
| +I |   1 |   2 |
| +I |   1 |   2 |
| +I |   1 |   2 |
++-+-+
{code}

Instead of the generated {{EXPR$1}} it would be nice to have {{a + 1}} (which 
is the case in some other data processing systems like Spark).




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34446) SqlValidatorException with LATERAL TABLE and JOIN

2024-02-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34446:
---
Description: 
found one regression issue. Query working Flink 1.17.2, but failing with Flink 
1.18.+

 
{code:java}
-- Query working Flink 1.17.2, but failing with Flink 1.18.+

-- -- [ERROR] Could not execute SQL statement. Reason:

-- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not found

SELECT * FROM sample as s,
LATERAL TABLE(split(s.id,'[01]'))
CROSS JOIN (VALUES ('A'), ('B'));
{code}

The problem is not related to the the alias scope. Even if we replace 
split(s.id.. ) with split(id,...) the error

{code:java}
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
not found in any table
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
{code}

will be generated. This seems to be Calcite issue, since this test fails on 
Calcite v1.32 and does not fail on Calcite 1.29.0 and 1.30.0.
We tested it with Calcite versions 1.31.0, 1.32.0, 1.33.0, 1.34.0, 1.35.0, 
1.36.0 and the main branch (c774c313a81d01c4e3e77cf296d04839c5ab04c0). The 
issue still remains

  was:
found one regression issue. Query working Flink 1.17.2, but failing with Flink 
1.18.+

 
{code:java}
-- Query working Flink 1.17.2, but failing with Flink 1.18.+

-- -- [ERROR] Could not execute SQL statement. Reason:

-- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not found

SELECT * FROM sample as s,
LATERAL TABLE(split(s.id,'[01]'))
CROSS JOIN (VALUES ('A'), ('B'));
{code}

The problem is not related to the the alias scope. Even if we replace 
split(s.id.. ) with split(id,...) the error

{code:java}
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
not found in any table
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
{code}

will be generated. This seems to be Calcite issue, since this test fails on 
Calcite v1.32 and does not fail on Calcite v1.29.
We tested it with Calcite versions calcite-1.32.0, calcite-1.33.0, 
calcite-1.34.0, calcite-1.35.0, calcite-1.36.0 and the main branch 
(c774c313a81d01c4e3e77cf296d04839c5ab04c0). The issue still remains


> SqlValidatorException with LATERAL TABLE and JOIN
> -
>
> Key: FLINK-34446
> URL: https://issues.apache.org/jira/browse/FLINK-34446
> Project: Flink
>  Issue Type: Bug
>Reporter: Jing Ge
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> found one regression issue. Query working Flink 1.17.2, but failing with 
> Flink 1.18.+
>  
> {code:java}
> -- Query working Flink 1.17.2, but failing with Flink 1.18.+
> -- -- [ERROR] Could not execute SQL statement. Reason:
> -- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not 
> found
> SELECT * FROM sample as s,
> LATERAL TABLE(split(s.id,'[01]'))
> CROSS JOIN (VALUES ('A'), ('B'));
> {code}
> The problem is not related to the the alias scope. Even if we replace 
> split(s.id.. ) with split(id,...) the error
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
> not found in any table
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> {code}
> will be generated. This seems to be Calcite issue, since this test fails on 
> Calcite v1.32 and does not fail on Calcite 1.29.0 and 1.30.0.
> We tested it with Calcite versions 1.31.0, 1.32.0, 1.33.0, 1.34.0, 1.35.0, 
> 1.36.0 and the main branch (c774c313a81d01c4e3e77cf296d04839c5ab04c0). The 
> issue still remains



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34446) SqlValidatorException with LATERAL TABLE and JOIN

2024-02-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34446:
---
Description: 
found one regression issue. Query working Flink 1.17.2, but failing with Flink 
1.18.+

 
{code:java}
-- Query working Flink 1.17.2, but failing with Flink 1.18.+

-- -- [ERROR] Could not execute SQL statement. Reason:

-- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not found

SELECT * FROM sample as s,
LATERAL TABLE(split(s.id,'[01]'))
CROSS JOIN (VALUES ('A'), ('B'));
{code}

The problem is not related to the the alias scope. Even if we replace 
split(s.id.. ) with split(id,...) the error

{code:java}
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
not found in any table
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
{code}

will be generated. This seems to be Calcite issue, since this test fails on 
Calcite v1.32 and does not fail on Calcite v1.29.
We tested it with Calcite versions calcite-1.32.0, calcite-1.33.0, 
calcite-1.34.0, calcite-1.35.0, calcite-1.36.0 and the main branch 
(c774c313a81d01c4e3e77cf296d04839c5ab04c0). The issue still remains

  was:
found one regression issue. Query working Flink 1.17.2, but failing with Flink 
1.18.+

 
{code:java}
-- Query working Flink 1.17.2, but failing with Flink 1.18.+

-- -- [ERROR] Could not execute SQL statement. Reason:

-- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not found

SELECT * FROM sample as s,
LATERAL TABLE(split(s.id,'[01]'))
CROSS JOIN (VALUES ('A'), ('B'));
{code}

The problem is not related to the the alias scope. Even if we replace 
split(s.id.. ) with split(id,...) the error

{code:java}
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
not found in any table
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
{code}

will be generated. This seems to be Calcite issue, since this test fails on 
Calcite v1.32 and does not fail on Calcite v1.29.


> SqlValidatorException with LATERAL TABLE and JOIN
> -
>
> Key: FLINK-34446
> URL: https://issues.apache.org/jira/browse/FLINK-34446
> Project: Flink
>  Issue Type: Bug
>Reporter: Jing Ge
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> found one regression issue. Query working Flink 1.17.2, but failing with 
> Flink 1.18.+
>  
> {code:java}
> -- Query working Flink 1.17.2, but failing with Flink 1.18.+
> -- -- [ERROR] Could not execute SQL statement. Reason:
> -- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not 
> found
> SELECT * FROM sample as s,
> LATERAL TABLE(split(s.id,'[01]'))
> CROSS JOIN (VALUES ('A'), ('B'));
> {code}
> The problem is not related to the the alias scope. Even if we replace 
> split(s.id.. ) with split(id,...) the error
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
> not found in any table
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> {code}
> will be generated. This seems to be Calcite issue, since this test fails on 
> Calcite v1.32 and does not fail on Calcite v1.29.
> We tested it with Calcite versions calcite-1.32.0, calcite-1.33.0, 
> calcite-1.34.0, calcite-1.35.0, calcite-1.36.0 and the main branch 
> (c774c313a81d01c4e3e77cf296d04839c5ab04c0). The issue still remains



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34446) SqlValidatorException with LATERAL TABLE and JOIN

2024-02-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34446:
---
Summary: SqlValidatorException with LATERAL TABLE and JOIN  (was: 
SqlValidatorException with )

> SqlValidatorException with LATERAL TABLE and JOIN
> -
>
> Key: FLINK-34446
> URL: https://issues.apache.org/jira/browse/FLINK-34446
> Project: Flink
>  Issue Type: Bug
>Reporter: Jing Ge
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> found one regression issue. Query working Flink 1.17.2, but failing with 
> Flink 1.18.+
>  
> {code:java}
> -- Query working Flink 1.17.2, but failing with Flink 1.18.+
> -- -- [ERROR] Could not execute SQL statement. Reason:
> -- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not 
> found
> SELECT * FROM sample as s,
> LATERAL TABLE(split(s.id,'[01]'))
> CROSS JOIN (VALUES ('A'), ('B'));
> {code}
> The problem is not related to the the alias scope. Even if we replace 
> split(s.id.. ) with split(id,...) the error
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
> not found in any table
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> {code}
> will be generated. This seems to be Calcite issue, since this test fails on 
> Calcite v1.32 and does not fail on Calcite v1.29.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34446) SqlValidatorException with

2024-02-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34446:
---
Summary: SqlValidatorException with   (was: regression: alias is not 
supported in lateral join with UDF)

> SqlValidatorException with 
> ---
>
> Key: FLINK-34446
> URL: https://issues.apache.org/jira/browse/FLINK-34446
> Project: Flink
>  Issue Type: Bug
>Reporter: Jing Ge
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> found one regression issue. Query working Flink 1.17.2, but failing with 
> Flink 1.18.+
>  
> {code:java}
> -- Query working Flink 1.17.2, but failing with Flink 1.18.+
> -- -- [ERROR] Could not execute SQL statement. Reason:
> -- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not 
> found
> SELECT * FROM sample as s,
> LATERAL TABLE(split(s.id,'[01]'))
> CROSS JOIN (VALUES ('A'), ('B'));
> {code}
> The problem is not related to the the alias scope. Even if we replace 
> split(s.id.. ) with split(id,...) the error
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
> not found in any table
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> {code}
> will be generated. This seems to be Calcite issue, since this test fails on 
> Calcite v1.32 and does not fail on Calcite v1.29.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34446) SqlValidatorException with Lateral Table and CROSS JOIN

2024-02-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34446:
---
Summary: SqlValidatorException with Lateral Table and CROSS JOIN  (was: SQL)

> SqlValidatorException with Lateral Table and CROSS JOIN
> ---
>
> Key: FLINK-34446
> URL: https://issues.apache.org/jira/browse/FLINK-34446
> Project: Flink
>  Issue Type: Bug
>Reporter: Jing Ge
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> found one regression issue. Query working Flink 1.17.2, but failing with 
> Flink 1.18.+
>  
> {code:java}
> -- Query working Flink 1.17.2, but failing with Flink 1.18.+
> -- -- [ERROR] Could not execute SQL statement. Reason:
> -- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not 
> found
> SELECT * FROM sample as s,
> LATERAL TABLE(split(s.id,'[01]'))
> CROSS JOIN (VALUES ('A'), ('B'));
> {code}
> The problem is not related to the the alias scope. Even if we replace 
> split(s.id.. ) with split(id,...) the error
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
> not found in any table
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> {code}
> will be generated. This seems to be Calcite issue, since this test fails on 
> Calcite v1.32 and does not fail on Calcite v1.29.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34446) SQL

2024-02-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34446:
---
Summary: SQL  (was: regression: alias is not in lateral join)

> SQL
> ---
>
> Key: FLINK-34446
> URL: https://issues.apache.org/jira/browse/FLINK-34446
> Project: Flink
>  Issue Type: Bug
>Reporter: Jing Ge
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> found one regression issue. Query working Flink 1.17.2, but failing with 
> Flink 1.18.+
>  
> {code:java}
> -- Query working Flink 1.17.2, but failing with Flink 1.18.+
> -- -- [ERROR] Could not execute SQL statement. Reason:
> -- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not 
> found
> SELECT * FROM sample as s,
> LATERAL TABLE(split(s.id,'[01]'))
> CROSS JOIN (VALUES ('A'), ('B'));
> {code}
> The problem is not related to the the alias scope. Even if we replace 
> split(s.id.. ) with split(id,...) the error
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
> not found in any table
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> {code}
> will be generated. This seems to be Calcite issue, since this test fails on 
> Calcite v1.32 and does not fail on Calcite v1.29.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34446) regression: alias is not in lateral join

2024-02-15 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34446:
---
Description: 
found one regression issue. Query working Flink 1.17.2, but failing with Flink 
1.18.+

 
{code:java}
-- Query working Flink 1.17.2, but failing with Flink 1.18.+

-- -- [ERROR] Could not execute SQL statement. Reason:

-- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not found

SELECT * FROM sample as s,
LATERAL TABLE(split(s.id,'[01]'))
CROSS JOIN (VALUES ('A'), ('B'));
{code}

The problem is not related to the the alias scope. Even if we replace 
split(s.id.. ) with split(id,...) the error

{code:java}
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
not found in any table
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
{code}

will be generated. This seems to be Calcite issue, since this test fails on 
Calcite v1.32 and does not fail on Calcite v1.29.

  was:
found one regression issue. Query working Flink 1.17.2, but failing with Flink 
1.18.+

 
{code:java}
-- Query working Flink 1.17.2, but failing with Flink 1.18.+

-- -- [ERROR] Could not execute SQL statement. Reason:

-- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not found

SELECT
a_or_b,
id, 
splits
FROM sample as s ,
LATERAL TABLE(split(s.id,'[01]')) lt(splits)
CROSS JOIN (VALUES ('A'), ('B')) AS cj(a_or_b); {code}


> regression: alias is not in lateral join
> 
>
> Key: FLINK-34446
> URL: https://issues.apache.org/jira/browse/FLINK-34446
> Project: Flink
>  Issue Type: Bug
>Reporter: Jing Ge
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> found one regression issue. Query working Flink 1.17.2, but failing with 
> Flink 1.18.+
>  
> {code:java}
> -- Query working Flink 1.17.2, but failing with Flink 1.18.+
> -- -- [ERROR] Could not execute SQL statement. Reason:
> -- -- org.apache.calcite.sql.validate.SqlValidatorException: Table 's' not 
> found
> SELECT * FROM sample as s,
> LATERAL TABLE(split(s.id,'[01]'))
> CROSS JOIN (VALUES ('A'), ('B'));
> {code}
> The problem is not related to the the alias scope. Even if we replace 
> split(s.id.. ) with split(id,...) the error
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'id' 
> not found in any table
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> {code}
> will be generated. This seems to be Calcite issue, since this test fails on 
> Calcite v1.32 and does not fail on Calcite v1.29.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34442) Support optimizations for pre-partitioned [external] data sources

2024-02-13 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34442:
---
Description: 
There are some use-cases in which data sources are pre-partitioned:

- Kafka broker is already partitioned w.r.t. some key[s]
- There are multiple [Flink] jobs  that materialize their outputs and read them 
as input subsequently

One of the main benefits is that we might avoid unnecessary shuffling. 
There is already an experimental feature in DataStream to support a subset of 
these [1].
We should support this for Flink Table/SQL as well. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/

  was:
There are some use-cases in which data sources are pre-partitioned:

- Kafka broker is already partitioned w.r.t. some key
- There are multiple Flink jobs  that materialize their outputs and read them 
as input subsequently

One of the main benefits is that we might avoid unnecessary shuffling. 
There is already an experimental feature in DataStream to support a subset of 
these [1].
We should support this for Flink Table/SQL as well. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/


> Support optimizations for pre-partitioned [external] data sources
> -
>
> Key: FLINK-34442
> URL: https://issues.apache.org/jira/browse/FLINK-34442
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Jeyhun Karimov
>Priority: Major
>
> There are some use-cases in which data sources are pre-partitioned:
> - Kafka broker is already partitioned w.r.t. some key[s]
> - There are multiple [Flink] jobs  that materialize their outputs and read 
> them as input subsequently
> One of the main benefits is that we might avoid unnecessary shuffling. 
> There is already an experimental feature in DataStream to support a subset of 
> these [1].
> We should support this for Flink Table/SQL as well. 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34442) Support optimizations for pre-partitioned [external] data sources

2024-02-13 Thread Jeyhun Karimov (Jira)
Jeyhun Karimov created FLINK-34442:
--

 Summary: Support optimizations for pre-partitioned [external] data 
sources
 Key: FLINK-34442
 URL: https://issues.apache.org/jira/browse/FLINK-34442
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.18.1
Reporter: Jeyhun Karimov


There are some use-cases in which data sources are pre-partitioned:

- Kafka broker is already partitioned w.r.t. some key
- There are multiple Flink jobs  that materialize their outputs and read them 
as input subsequently

One of the main benefits is that we might avoid unnecessary shuffling. 
There is already an experimental feature in DataStream to support a subset of 
these [1].
We should support this for Flink Table/SQL as well. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816252#comment-17816252
 ] 

Jeyhun Karimov commented on FLINK-34378:


Hi [~xuyangzhong] the ordering is different even with parallelism 1 because of 
{{Set}} in {{MiniBatch}} operator. IMO this is expected behavior.  

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> ++---+---+---+---+ 
> | op | a | b | a0| b0| 
> ++---+---+---+---+ 
> | +I | 3 | 3 | 3 | 3 | 
> | +I | 7 | 7 | 7 | 7 | 
> | +I | 2 | 2 | 2 | 2 | 
> | +I | 5 | 5 | 5 | 5 | 
> | +I | 1 | 1 | 1 | 1 | 
> | +I | 6 | 6 | 6 | 6 | 
> | +I | 4 | 4 | 4 | 4 | 
> | +I | 8 | 8 | 8 | 8 | 
> ++---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> ++---+---+++
> | op | a | b | a0 | b0 |
> ++---+---+++
> | +I | 1 | 1 |  1 |  1 |
> | +I | 2 | 2 |  2 |  2 |
> | +I | 3 | 3 |  3 |  3 |
> | +I | 4 | 4 |  4 |  4 |
> | +I | 5 | 5 |  5 |  5 |
> | +I | 6 | 6 |  6 |  6 |
> | +I | 7 | 7 |  7 |  7 |
> | +I | 8 | 8 |  8 |  8 |
> ++---+---+++
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34129) MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired

2024-02-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816250#comment-17816250
 ] 

Jeyhun Karimov commented on FLINK-34129:


Hi [~loserwang1024],  [~xuyangzhong] I am not sure if it a bug or expected 
behaviour in local-global aggregation. 

Partitioned aggregates (see {{GroupAggFunction::processElement}}) solve the 
above-mentioned issue by tracking the {{firstRow}} and avoid sending the first 
row to {{retract}} function. In this case, since the state partitioned and 
there is only one operator instance responsible for the partition, we can avoid 
the above mentioned behaviour. 

In the presence of local-global aggregates,  however:
- it is difficult to prevent the above-mentioned behaviour in 
{{LocalGroupAggFunction}} instances, since there can be multiple of 
{{LocalGroupAggFunction}} instances, and there is no ordering among them ( to 
track {{firstRow}} and to avoid it being retracted)
- it is difficult to prefent the above-mentioned behaviour in 
{{GlobalGroupAggFunction}} instances, since it already receives pre-aggregated 
data. 

Currently, the only way to avoid this behavior is to either

- Use the {{firstRow}} tracking (similar to 
{{GroupAggFunction::processElement}}) in {{LocalGroupAggFunction}} AND use 
parallelism 1
- Use the partitioned aggregates

> MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when 
> state expired 
> -
>
> Key: FLINK-34129
> URL: https://issues.apache.org/jira/browse/FLINK-34129
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: 1.19.0
>
>
> Take sum for example:
> When state is expired, then an update operation from source happens. 
> MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but 
> will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from 
> external database.
> Let's see why this will happens:
>  * when state is expired and -U[1, 20] arrive, 
> MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set 
> firstRow as true.
> {code:java}
> if (stateAcc == null) { 
>     stateAcc = globalAgg.createAccumulators(); 
>     firstRow = true; 
> }   {code}
>  * then sum accumulator will retract sum value as -20
>  * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, 
> then emit to downstream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>    // if this was not the first row and we have to emit retractions
>     if (!firstRow) {
>        // ignore
>     } else {
>     // update acc to state
>     accState.update(acc);
>  
>    // this is the first, output new result
>    // prepare INSERT message for new row
>    resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
>    out.collect(resultRow);
> }  {code}
>  * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, 
> so RetractionRecordCounter#recordCountIsZero will return true. Because 
> firstRow = false now, will change the +U as -D, then emit to downtream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>     // ignode
> }else{
>    // we retracted the last record for this key
>    // if this is not first row sent out a DELETE message
>    if (!firstRow) {
>    // prepare DELETE message for previous row
>    resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
>    out.collect(resultRow);
> } {code}
>  
> So the sink will receiver +I and -D after a source update operation, the data 
> will be delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34366) Add support to group rows by column ordinals

2024-02-06 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814898#comment-17814898
 ] 

Jeyhun Karimov edited comment on FLINK-34366 at 2/6/24 5:10 PM:


Hi [~martijnvisser] I worked on this issue. Could you please check the PR in 
your available time? Thanks!


was (Author: jeyhunkarimov):
Hi [~martijnvisser] I worked on this issue. Could you please check the PR?

> Add support to group rows by column ordinals
> 
>
> Key: FLINK-34366
> URL: https://issues.apache.org/jira/browse/FLINK-34366
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> Reference: BigQuery 
> https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#group_by_col_ordinals
> The GROUP BY clause can refer to expression names in the SELECT list. The 
> GROUP BY clause also allows ordinal references to expressions in the SELECT 
> list, using integer values. 1 refers to the first value in the SELECT list, 2 
> the second, and so forth. The value list can combine ordinals and value 
> names. The following queries are equivalent:
> {code:sql}
> WITH PlayerStats AS (
>   SELECT 'Adams' as LastName, 'Noam' as FirstName, 3 as PointsScored UNION ALL
>   SELECT 'Buchanan', 'Jie', 0 UNION ALL
>   SELECT 'Coolidge', 'Kiran', 1 UNION ALL
>   SELECT 'Adams', 'Noam', 4 UNION ALL
>   SELECT 'Buchanan', 'Jie', 13)
> SELECT SUM(PointsScored) AS total_points, LastName, FirstName
> FROM PlayerStats
> GROUP BY LastName, FirstName;
> /*--+--+---+
>  | total_points | LastName | FirstName |
>  +--+--+---+
>  | 7| Adams| Noam  |
>  | 13   | Buchanan | Jie   |
>  | 1| Coolidge | Kiran |
>  +--+--+---*/
> {code}
> {code:sql}
> WITH PlayerStats AS (
>   SELECT 'Adams' as LastName, 'Noam' as FirstName, 3 as PointsScored UNION ALL
>   SELECT 'Buchanan', 'Jie', 0 UNION ALL
>   SELECT 'Coolidge', 'Kiran', 1 UNION ALL
>   SELECT 'Adams', 'Noam', 4 UNION ALL
>   SELECT 'Buchanan', 'Jie', 13)
> SELECT SUM(PointsScored) AS total_points, LastName, FirstName
> FROM PlayerStats
> GROUP BY 2, 3;
> /*--+--+---+
>  | total_points | LastName | FirstName |
>  +--+--+---+
>  | 7| Adams| Noam  |
>  | 13   | Buchanan | Jie   |
>  | 1| Coolidge | Kiran |
>  +--+--+---*/
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34366) Add support to group rows by column ordinals

2024-02-06 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814898#comment-17814898
 ] 

Jeyhun Karimov commented on FLINK-34366:


Hi [~martijnvisser] I worked on this issue. Could you please check the PR?

> Add support to group rows by column ordinals
> 
>
> Key: FLINK-34366
> URL: https://issues.apache.org/jira/browse/FLINK-34366
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> Reference: BigQuery 
> https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#group_by_col_ordinals
> The GROUP BY clause can refer to expression names in the SELECT list. The 
> GROUP BY clause also allows ordinal references to expressions in the SELECT 
> list, using integer values. 1 refers to the first value in the SELECT list, 2 
> the second, and so forth. The value list can combine ordinals and value 
> names. The following queries are equivalent:
> {code:sql}
> WITH PlayerStats AS (
>   SELECT 'Adams' as LastName, 'Noam' as FirstName, 3 as PointsScored UNION ALL
>   SELECT 'Buchanan', 'Jie', 0 UNION ALL
>   SELECT 'Coolidge', 'Kiran', 1 UNION ALL
>   SELECT 'Adams', 'Noam', 4 UNION ALL
>   SELECT 'Buchanan', 'Jie', 13)
> SELECT SUM(PointsScored) AS total_points, LastName, FirstName
> FROM PlayerStats
> GROUP BY LastName, FirstName;
> /*--+--+---+
>  | total_points | LastName | FirstName |
>  +--+--+---+
>  | 7| Adams| Noam  |
>  | 13   | Buchanan | Jie   |
>  | 1| Coolidge | Kiran |
>  +--+--+---*/
> {code}
> {code:sql}
> WITH PlayerStats AS (
>   SELECT 'Adams' as LastName, 'Noam' as FirstName, 3 as PointsScored UNION ALL
>   SELECT 'Buchanan', 'Jie', 0 UNION ALL
>   SELECT 'Coolidge', 'Kiran', 1 UNION ALL
>   SELECT 'Adams', 'Noam', 4 UNION ALL
>   SELECT 'Buchanan', 'Jie', 13)
> SELECT SUM(PointsScored) AS total_points, LastName, FirstName
> FROM PlayerStats
> GROUP BY 2, 3;
> /*--+--+---+
>  | total_points | LastName | FirstName |
>  +--+--+---+
>  | 7| Adams| Noam  |
>  | 13   | Buchanan | Jie   |
>  | 1| Coolidge | Kiran |
>  +--+--+---*/
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33790) Upsert statement filter unique key field colume in mysql dielact

2024-02-04 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814125#comment-17814125
 ] 

Jeyhun Karimov commented on FLINK-33790:


Hi [~lijingwei.5018] thanks for reporting this issue. There is a similar test 
in 
{code:java}
FieldNamedPreparedStatementImplTest::testUpsertStatement
{code} 
with key fields with field names  = 
{code:java}
{"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}
{code}
 and key fields = 
{code:java}
{"id", "__field_3__"}
{code}
The test seems passing. Am I missing something?



> Upsert statement filter unique key field colume in mysql dielact 
> -
>
> Key: FLINK-33790
> URL: https://issues.apache.org/jira/browse/FLINK-33790
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: JingWei Li
>Priority: Minor
>
> example: `col2` and `col4` is unique key in table `my_table`
>  
> {code:java}
> INSERT INTO `my_table`(`col1`, `col2`, `col3`, `col4`, `col5`) 
> VALUES (?, ?, ?, ?, ?)
> ON DUPLICATE KEY UPDATE 
> `col1`=VALUES(`col1`),
> `col2`=VALUES(`col2`),
> `col3`=VALUES(`col3`),
> `col4`=VALUES(`col4`),
> `col5`=VALUES(`col5`){code}
> result:
> {code:java}
> INSERT INTO `my_table`(`col1`, `col2`, `col3`, `col4`, `col5`) 
> VALUES (?, ?, ?, ?, ?)
> ON DUPLICATE KEY UPDATE 
> `col1`=VALUES(`col1`),
> `col3`=VALUES(`col3`),
> `col5`=VALUES(`col5`) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-02-03 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813995#comment-17813995
 ] 

Jeyhun Karimov commented on FLINK-33989:


Hi [~flaviu.cicio], with the latest version of {{flink-connector-kafka}} I was 
not able to reproduce the issue. In both cases, I am getting 


{code:java}
{"id":1,"comment":"abc"}
{"id":1,"comment":"abcd"}
{code}

as an output. Could you please verify?

> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert 
> Kafka Connector
> -
>
> Key: FLINK-33989
> URL: https://issues.apache.org/jira/browse/FLINK-33989
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Runtime
>Affects Versions: 1.17.2
>Reporter: Flaviu Cicio
>Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'input', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> );
> CREATE TABLE output (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'output', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> But, if we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); 
> {code}
> The following entries are published:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   null,
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> We would expect the result to be the same for both insert statements.
> As we can see, there is an extra tombstone generated as a result of the 
> second statement.
>  
> Moreover, if we make a select on the input table:
> {code:sql}
> SELECT * FROM input;
> {code}
> We will get the following entries:
> ||op||id||current_value||
> |I|1|abc|
> |-U|1|abc|
> |+U|1|abcd|
> We expected to see only the insert and the update_after entries.
> The update_before is added at DeduplicateFunctionHelper#122.
> This is easily reproducible with this test that we added in the 
> UpsertKafkaTableITCase from flink-connector-kafka:
> {code:java}
> @Test
> public void testAggregateFilterOmit() throws Exception {
> String topic = COUNT_FILTER_TOPIC + "_" + format;
> createTestTopic(topic, 1, 1);
> env.setParallelism(1);
> // -   test   ---
> countFilterToUpsertKafkaOmitUpdateBefore(topic);
> // - clean up ---
> deleteTestTopic(topic);
> }
> private void countFilterToUpsertKafkaOmitUpdateBefore(String table) 
> throws Exception {
> String bootstraps = getBootstrapServers();
> List data =
> Arrays.asList(
> Row.of(1, "Hi"),
> Row.of(1, "Hello"),
> Row.of(2, "Hello world"),
> Row.of(2, "Hello world, how are you?"),
> Row.of(2, "I am fine."),
> Row.of(3, "Luke Skywalker"),
> Row.of(3, "Comment#1"),
> Row.of(3, "Comment#2"),
> Row.of(4, "Comment#3"),
> Row.of(4, null));
> final String createSource =
> String.format(
> "CREATE TABLE aggfilter_%s ("
> + "  `id` INT,\n"
> + "  `comment` STRING\n"
> + ") WITH ("
> + "  'connector' = 'values',"
> + "  'data-id' = '%s'"
> + ")",
> format, TestValuesTableFactory.registerData(data));
> tEnv.executeSql(createSource);
> 

[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-27 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811594#comment-17811594
 ] 

Jeyhun Karimov commented on FLINK-33996:


Hi [~libenchao] Thanks for your comment. I agree that supporting expr reuse in 
the codegen phase would be a better solution. It requires a bit of time, but I 
already started working on this (FLINK-21573) and will ping you guys once I am 
finished. 

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>  Labels: pull-request-available
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explain plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34135) A number of ci failures with Access to the path '.../_work/_temp/containerHandlerInvoker.js' is denied.

2024-01-23 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810104#comment-17810104
 ] 

Jeyhun Karimov commented on FLINK-34135:


Hi [~mapohl] yes CI seems stable again, we can close the issue

> A number of ci failures with Access to the path 
> '.../_work/_temp/containerHandlerInvoker.js' is denied.
> ---
>
> Key: FLINK-34135
> URL: https://issues.apache.org/jira/browse/FLINK-34135
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Reporter: Sergey Nuyanzin
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: test-stability
>
> There is a number of builds failing with something like 
> {noformat}
> ##[error]Access to the path 
> '/home/agent03/myagent/_work/_temp/containerHandlerInvoker.js' is denied.
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56490=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=fb588352-ef18-568d-b447-699986250ccb
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=554d7c3f-d38e-55f4-96b4-ada3a9cb7d6f=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=fa307d6d-91b1-5ab6-d460-ef50f552b1fe=1798d435-832b-51fe-a9ad-efb9abf4ab04=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=a1ac4ce4-9a4f-5fdb-3290-7e163fba19dc=e4c57254-ec06-5788-3f8e-5ad5dffb418e=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=56881383-f398-5091-6b3b-22a7eeb7cfa8=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=2d9c27d0-8dbb-5be9-7271-453f74f48ab3=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=162f98f7-8967-5f47-2782-a1e178ec2ad3=c9934c56-710d-5f85-d2b8-28ec1fd700ed=9



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34135) A number of ci failures with Access to the path '.../_work/_temp/containerHandlerInvoker.js' is denied.

2024-01-23 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov closed FLINK-34135.
--
Resolution: Fixed

> A number of ci failures with Access to the path 
> '.../_work/_temp/containerHandlerInvoker.js' is denied.
> ---
>
> Key: FLINK-34135
> URL: https://issues.apache.org/jira/browse/FLINK-34135
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Reporter: Sergey Nuyanzin
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: test-stability
>
> There is a number of builds failing with something like 
> {noformat}
> ##[error]Access to the path 
> '/home/agent03/myagent/_work/_temp/containerHandlerInvoker.js' is denied.
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56490=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=fb588352-ef18-568d-b447-699986250ccb
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=554d7c3f-d38e-55f4-96b4-ada3a9cb7d6f=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=fa307d6d-91b1-5ab6-d460-ef50f552b1fe=1798d435-832b-51fe-a9ad-efb9abf4ab04=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=a1ac4ce4-9a4f-5fdb-3290-7e163fba19dc=e4c57254-ec06-5788-3f8e-5ad5dffb418e=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=56881383-f398-5091-6b3b-22a7eeb7cfa8=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=2d9c27d0-8dbb-5be9-7271-453f74f48ab3=9
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56481=logs=162f98f7-8967-5f47-2782-a1e178ec2ad3=c9934c56-710d-5f85-d2b8-28ec1fd700ed=9



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34098) Not enough Azure Pipeline CI runners available?

2024-01-23 Thread Jeyhun Karimov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov closed FLINK-34098.
--
Resolution: Fixed

It was a disk-full issue. I reported the recurring error (that leads to the big 
maven log file) in FLINK-34155

> Not enough Azure Pipeline CI runners available?
> ---
>
> Key: FLINK-34098
> URL: https://issues.apache.org/jira/browse/FLINK-34098
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.18.0, 1.17.2, 1.19.0
>Reporter: Matthias Pohl
>Assignee: Jeyhun Karimov
>Priority: Critical
>
> CI takes longer than usual. There might be an issue with the runner pool (on 
> the Alibaba VMs)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34155) Recurring SqlExecutionException

2024-01-19 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808539#comment-17808539
 ] 

Jeyhun Karimov commented on FLINK-34155:


Hi [~lincoln.86xy] for example 
{{org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase is the one 
failing.}}

> Recurring SqlExecutionException
> ---
>
> Key: FLINK-34155
> URL: https://issues.apache.org/jira/browse/FLINK-34155
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Jeyhun Karimov
>Priority: Blocker
>  Labels: test
> Attachments: disk-full.log
>
>
> When analyzing very big maven log file in our CI system, I found out that 
> there is a recurring {{{}SqlException (subset of the log file is 
> attached){}}}:
>  
> {{org.apache.flink.table.gateway.service.utils.SqlExecutionException: Only 
> 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' 
> statement to submit Statement Set.}}
>  
>  
> which leads to:
>  
> {{06:31:41,155 [flink-rest-server-netty-worker-thread-22] ERROR 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler [] 
> - Unhandled exception.}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34155) Recurring SqlExecutionException

2024-01-18 Thread Jeyhun Karimov (Jira)
Jeyhun Karimov created FLINK-34155:
--

 Summary: Recurring SqlExecutionException
 Key: FLINK-34155
 URL: https://issues.apache.org/jira/browse/FLINK-34155
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.8.0
Reporter: Jeyhun Karimov
 Attachments: disk-full.log

When analyzing very big maven log file in our CI system, I found out that there 
is a recurring {{{}SqlException (subset of the log file is attached){}}}:

 
{{org.apache.flink.table.gateway.service.utils.SqlExecutionException: Only 
'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' 
statement to submit Statement Set.}}
 
 

which leads to:

 
{{06:31:41,155 [flink-rest-server-netty-worker-thread-22] ERROR 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler [] - 
Unhandled exception.}}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-05 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803626#comment-17803626
 ] 

Jeyhun Karimov commented on FLINK-33996:


Thanks for reporting the issue [~hackergin] and thanks for your comment 
[~xuyangzhong]. I mostly agree with [~xuyangzhong]. An alternative (or simpler) 
solution would be to add an extra check in {{FlinkRelUtil::mergeable}} 
function, since all projection/calc merging rules use the result of this 
function before merging.

[~hackergin] IMO adding a new rule would be tricky since

- We will need to ingest the new rule to multiple places in the optimization 
process (codebase), because 1) project and calc merging rules are used several 
times and 2) the order of the new optimization rule will matter
- If we add a new rule, and if the two project/calc nodes already merged (by 
the existing merging rules), it will be non-trivial for our new rule to 
rollback (seperate the merged project/calc nodes)

What do you guys think? Also, please feel free to check/review the PR.

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>  Labels: pull-request-available
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33446) SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation doesn't produce the correct plan

2024-01-03 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802090#comment-17802090
 ] 

Jeyhun Karimov commented on FLINK-33446:


Hi [~fsk119] I scanned through the issue a bit. 

I can confirm that {{Sql2RelConverter}} generates the correct plan. The issue 
IMO is related to the {{RelDecorrelator.decorrelateQuery}} 
({{FlinkDecorrelateProgram::optimize}}), specifically this line:


{code:java}
val result = RelDecorrelator.decorrelateQuery(root)
{code}

the input plan ({{root}}) is:


{code:java}
LogicalProject(d2=[$0], d3=[$1])
  LogicalProject(d2=[$0], d3=[$1])
LogicalFilter(condition=[IS NULL($2)])
  LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0, 1}])
LogicalProject(d2=[+(2, $0)], d3=[+(3, $1)])
  LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
LogicalAggregate(group=[{0}])
  LogicalProject(i=[true])
LogicalProject(d1=[$0])
  LogicalFilter(condition=[AND(=($0, $cor0.d2), IS NULL($1))])
LogicalJoin(condition=[true], joinType=[left], 
variablesSet=[[$cor1, $cor0]])
  LogicalProject(d1=[+($0, 1)])
LogicalTableScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]])
  LogicalAggregate(group=[{0}])
LogicalProject(i=[true])
  LogicalFilter(condition=[AND(=($0, $cor1.d1), =($1, 
$cor1.d1), =(CAST($2):BIGINT, $cor0.d3))])
LogicalProject(d4=[+($0, 4)], d5=[+($0, 5)], d6=[+($0, 
6)])
  LogicalTableScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}


and the output of the function call ({{RelDecorrelator.decorrelateQuery}}) is:


{code:java}
LogicalProject(d2=[$0], d3=[$1], d4=[$4])
  LogicalFilter(condition=[IS NULL($5)])
LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
  LogicalProject(d2=[+(2, $0)], d3=[+(3, $1)])
LogicalTableScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]])
  LogicalProject(d11=[$0], $f3=[$1], d4=[$2], $f4=[true])
LogicalAggregate(group=[{0, 1, 2}])
  LogicalProject(d11=[$0], $f3=[$1], d4=[$2])
LogicalFilter(condition=[IS NULL($3)])
  LogicalJoin(condition=[true], joinType=[left])
LogicalFilter(condition=[IS NOT NULL($0)])
  LogicalProject(d1=[+($0, 1)])
LogicalTableScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]])
LogicalProject($f3=[$0], d4=[$1], $f2=[true])
  LogicalAggregate(group=[{0, 1}])
LogicalProject($f3=[$3], d4=[$0])
  LogicalFilter(condition=[AND(=($1, $0), 
=(CAST($2):BIGINT, $3))])
LogicalProject(d4=[+($0, 4)], d5=[+($0, 5)], d6=[+($0, 
6)], $f3=[CAST(+($0, 6)):BIGINT])
  LogicalTableScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]])

{code}

WDYT?



> SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation
>  doesn't produce the correct plan
> ---
>
> Key: FLINK-33446
> URL: https://issues.apache.org/jira/browse/FLINK-33446
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Shengkai Fang
>Priority: Major
>
> Although this test doesn't throw an exception, the final plan produces 3 
> columns rather than 2 after optimization.
> {code:java}
> LogicalProject(inputs=[0..1], exprs=[[$4]])
> +- LogicalFilter(condition=[IS NULL($5)])
>+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
>   :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
>   :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
> source: [TestTableSource(a, b, c)]]])
>   +- LogicalProject(inputs=[0..2], exprs=[[true]])
>  +- LogicalAggregate(group=[{0, 1, 2}])
> +- LogicalProject(inputs=[0..2])
>+- LogicalFilter(condition=[IS NULL($3)])
>   +- LogicalJoin(condition=[true], joinType=[left])
>  :- LogicalFilter(condition=[IS NOT NULL($0)])
>  :  +- LogicalProject(exprs=[[+($0, 1)]])
>  : +- LogicalTableScan(table=[[default_catalog, 
> default_database, r, source: [TestTableSource(d, e, f)]]])
>  +- LogicalProject(inputs=[0..1], exprs=[[true]])
> +- 

[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-14 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796939#comment-17796939
 ] 

Jeyhun Karimov commented on FLINK-33756:


[~jhughes] Yes it is just printing/logging on various places of the codebase.
If you are not working on this issue (or if it is not sth urgent), you can 
assign it to me, will try to come up with deterministic solution to avoid the 
flakiness. 

> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-13 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796372#comment-17796372
 ] 

Jeyhun Karimov commented on FLINK-33756:


Hi [~jhughes] I had a chance to look at the issue. I share my findings below.

So, when we enable 
{{OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED}}, the 
following optimized execution plan is produced:


{code}
Sink(table=[default_catalog.default_database.MySink], fields=[name, $f1, $f2, 
window_start, window_end])
+- GlobalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_end=[$window_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, MAX(max$0) AS $f1, $SUM0(sum$1) AS $f2, start('w$) AS 
window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[name]])
  +- LocalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[15 
s], step=[5 s], offset=[1 s])], select=[name, MAX($f5_0) AS max$0, $SUM0($f6_0) 
AS sum$1, slice_end('w$) AS $window_end])
 +- Calc(select=[name, window_start, window_end, $f5, $f6, $f3 AS 
$f5_0, $f4 AS $f6_0])
+- GlobalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(slice_end=[$slice_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(max$0) AS $f3, COUNT(distinct$0 count$1) AS 
$f4, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[name, $f5, $f6]])
  +- LocalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(time_col=[rowtime], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(double) FILTER $g_1 AS max$0, 
COUNT(distinct$0 int) FILTER $g_2 AS count$1, DISTINCT(int) AS distinct$0, 
slice_end('w$) AS $slice_end])
 +- Calc(select=[name, double, int, $f5, $f6, ($e = 1) AS 
$g_1, ($e = 2) AS $g_2, rowtime])
+- Expand(projects=[{name, double, int, $f5, null AS 
$f6, 1 AS $e, rowtime}, {name, double, int, null AS $f5, $f6, 2 AS $e, 
rowtime}])
   +- Calc(select=[name, double, int, 
MOD(HASH_CODE(double), 1024) AS $f5, MOD(HASH_CODE(int), 1024) AS $f6, 
Reinterpret(TO_TIMESTAMP(ts)) AS rowtime])
  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[int, double, name, ts], metadata=[], 
watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[int, double, name, ts])

{code}


As we see, there are two window operators (both with {{Local-Global 
optimization}} ). (Just to remember that the missing record is  - "+I[b, 3.0, 
1, 2020-10-10T00:00:31, 2020-10-10T00:00:41]")

As we see from the schema of the second {{LocalWindowAggregate}}, it uses 
{{window_start}} and {{window_end}} to calculate {{CUMULATE}} windows. At this 
point (at the second {{LocalWindowAggregate}}), our "missing" record becomes 
like "+I(b,2020-10-10T00:00:31,2020-10-10T00:00:41,0,null,3.0,0)". So, at this 
point, we already lost the original event time of the record. 


As a result, the flaky behaviour happens because of the calling order between 
{{SlicingWindowOperator::processWatermark}}->{{AbstractWindowAggProcessor::advanceProgress}}
 and {{SlicingWindowOperator::processElement}}:

- If the {{processWatermark}} is called before the {{processElement}}, then the 
{{currentProgress}} is updated to {{1602288041000}}. In this case, once the 
{{processElement}} is called afterwards, it considers the window is already 
fired and drops the element

- If the {{processElement}} is called before the {{processWatermark}}, then the 
record processed as expected. 

Is this something expected? WDYT?


> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33622) table.optimizer.reuse-sub-plan-enabled doesn't work when a Table is converted to a DataStream

2023-12-08 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794590#comment-17794590
 ] 

Jeyhun Karimov commented on FLINK-33622:


Hi [~sap1ens] thanks for reporting the issue. 

First, indeed conversion of {{Table}} to {{DataStream}} and back to {{Table}} 
leads to some medadata loss. As a result, the optimizer cannot reuse the common 
plan(s).
Second, why don't you apply the filtering (that you did with {{DataStream}}) 
with the {{Table}} API?


> table.optimizer.reuse-sub-plan-enabled doesn't work when a Table is converted 
> to a DataStream 
> --
>
> Key: FLINK-33622
> URL: https://issues.apache.org/jira/browse/FLINK-33622
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Yaroslav Tkachenko
>Priority: Major
> Attachments: Screenshot 2023-11-22 at 11.09.46 AM.png, Screenshot 
> 2023-11-22 at 11.10.29 AM.png
>
>
> I have a source (a DataStream converted to a Table), a SQL transformation 
> (really anything, could be a join or a simple "SELECT * FROM"), and *two* 
> Table API sinks (added via a 
> StatementSet).
> Here's the execution plan for this case:
> {code:java}
> Calc(select=[id, address, amount])(reuse_id=[1])
> +- DropUpdateBefore
>    +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id, 
> event_signature, address, amount, 
> contract_address])Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- 
> Reused(reference_id=[1])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- Reused(reference_id=[1]) {code}
> As you can see, a transformation is reused by both sinks. 
> In another case, before writing a transformation to one of the sinks, I 
> convert the Table to a DataStream and then back to a Table (I actually apply 
> some filtering on the DataStream, but the problem persists even after 
> removing it, so it's irrelevant).
> In this case, sinks don't reuse the results of the transformation; here's an 
> execution plan:
> {code:java}
> Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- TableSourceScan(table=[[*anonymous_datastream_source$3*]], fields=[id, 
> address, 
> amount])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- Calc(select=[id, address, amount])
>    +- DropUpdateBefore
>       +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], 
> fields=[id, event_signature, address, amount, contract_address]) {code}
> So, the data is processed twice. It could be a big problem for a heavy 
> stateful operation. 
> This feels like a bug in the optimizer. The same situation can be achieved by 
> turning off *table.optimizer.reuse-sub-plan-enabled* option.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33666) MergeTableLikeUtil uses different constraint name than Schema

2023-12-06 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793590#comment-17793590
 ] 

Jeyhun Karimov commented on FLINK-33666:


Hi [~twalthr] can you please assign this task to me or give me access to 
self-assign tasks?

> MergeTableLikeUtil uses different constraint name than Schema
> -
>
> Key: FLINK-33666
> URL: https://issues.apache.org/jira/browse/FLINK-33666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> {{MergeTableLikeUtil}} uses a different algorithm to name constraints than 
> {{Schema}}. 
> {{Schema}} includes the column names.
> {{MergeTableLikeUtil}} uses a hashCode which means it might depend on JVM 
> specifics.
> For consistency we should use the same algorithm. I propose to use 
> {{Schema}}'s logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33670) Public operators cannot be reused in multi sinks

2023-12-05 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793232#comment-17793232
 ] 

Jeyhun Karimov commented on FLINK-33670:


Hi [~zicat] I tried with Flink master branch, below is the plan I get for your 
case. Looking at the Optimized execution plan, it seems the deduplicate part 
(reuse_id=1) is reused between the two sinks. Do you confirm?

 

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- LogicalProject(id=[$0], ts=[$1])
   +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
      +- LogicalFilter(condition=[=($3, 1)])
         +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER() OVER 
(PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
            +- LogicalTableScan(table=[[default_catalog, default_database, 
source]])

LogicalSink(table=[default_catalog.default_database.print2], fields=[id, 
EXPR$1, EXPR$2])
+- LogicalProject(id=[$1], EXPR$1=[TUMBLE_START($0)], EXPR$2=[$2])
   +- LogicalAggregate(group=[\{0, 1}], EXPR$2=[SUM($2)])
      +- LogicalProject($f0=[$TUMBLE($1, 2:INTERVAL SECOND)], id=[$0], 
v=[$2])
         +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
            +- LogicalFilter(condition=[=($3, 1)])
               +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER() 
OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
                  +- LogicalTableScan(table=[[default_catalog, 
default_database, source]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[=(w0$o0, 1)])
   +- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*) 
AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v, 
$3, w0$o0])
      +- Sort(orderBy=[id ASC, $3 ASC])
         +- Calc(select=[id, ts, v, PROCTIME() AS $3])
            +- Exchange(distribution=[hash[id]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
source]], fields=[id, ts, v])

Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1, 
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
   +- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts, 
2)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
      +- Calc(select=[ts, id, v], where=[=(w0$o0, 1)])
         +- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], 
window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[id, ts, v, $3, w0$o0])
            +- Sort(orderBy=[id ASC, $3 ASC])
               +- Calc(select=[id, ts, v, PROCTIME() AS $3])
                  +- Exchange(distribution=[hash[id]])
                     +- TableSourceScan(table=[[default_catalog, 
default_database, source]], fields=[id, ts, v])

== Optimized Execution Plan ==
OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*) AS 
w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v, $3, 
w0$o0])(reuse_id=[1])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[id ASC, $3 ASC])
      +- Exchange(distribution=[keep_input_as_is[hash[id]]])
         +- Calc(select=[id, ts, v, PROCTIME() AS $3])
            +- Exchange(distribution=[hash[id]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
source]], fields=[id, ts, v])

Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[(w0$o0 = 1)])
   +- Reused(reference_id=[1])

Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1, 
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
   +- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts, 
2)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
      +- Exchange(distribution=[keep_input_as_is[hash[id]]])
         +- Calc(select=[ts, id, v], where=[(w0$o0 = 1)])
            +- Exchange(distribution=[keep_input_as_is[hash[ts]]])
               +- Reused(reference_id=[1])

 

 

 

> Public operators cannot be reused in multi sinks
> 
>
> Key: FLINK-33670
> URL: https://issues.apache.org/jira/browse/FLINK-33670
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Lyn Zhang
>Priority: Major
> Attachments: image-2023-11-28-14-31-30-153.png
>
>
> Dear all:
>    I find that some public operators cannot be reused when submit a job with 
> multi sinks. I have an example as follows:
> {code:java}
> CREATE TABLE source (
>     id              STRING,
>     ts              TIMESTAMP(3),
>     v              BIGINT,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> ) WITH 

[jira] [Commented] (FLINK-32986) The new createTemporaryFunction has some regression of type inference compare to the deprecated registerFunction

2023-12-02 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792406#comment-17792406
 ] 

Jeyhun Karimov commented on FLINK-32986:


Hi [~lincoln.86xy]  could you please assign this task to me or give me an 
access to self-assign the task? Thanks!

> The new createTemporaryFunction has some regression of type inference compare 
> to the deprecated registerFunction
> 
>
> Key: FLINK-32986
> URL: https://issues.apache.org/jira/browse/FLINK-32986
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Affects Versions: 1.18.0, 1.17.1
>Reporter: lincoln lee
>Priority: Major
>  Labels: pull-request-available
>
> Current `LookupJoinITCase#testJoinTemporalTableWithUdfFilter` uses a legacy 
> form function registration:
> {code}
> tEnv.registerFunction("add", new TestAddWithOpen)
> {code}
> it works fine with the SQL call `add(T.id, 2) > 3` but fails when swith to 
> the new api:
> {code}
> tEnv.createTemporaryFunction("add", classOf[TestAddWithOpen])
> // or this
> tEnv.createTemporaryFunction("add", new TestAddWithOpen)
> {code}
> exception:
> {code}
> Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
> call:
> default_catalog.default_database.add(BIGINT, INT NOT NULL)
>   at 
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:193)
>   at 
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
>   at 
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:753)
>   at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:499)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:335)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:231)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860)
>   at 
> org.apache.calcite.sql.type.SqlTypeUtil.deriveType(SqlTypeUtil.java:200)
>   at 
> org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:47)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:2050)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:2055)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4338)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3410)
>   at 
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:154)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3282)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3603)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
>   ... 49 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input 
> arguments. Expected signatures are:
> default_catalog.default_database.add(a BIGINT NOT NULL, b INT NOT NULL)
> default_catalog.default_database.add(a BIGINT NOT NULL, b BIGINT NOT NULL)
>   at 
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:180)
>   at 
> 

[jira] [Commented] (FLINK-31481) Support enhanced show databases syntax

2023-12-02 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792404#comment-17792404
 ] 

Jeyhun Karimov commented on FLINK-31481:


Hi [~taoran] could you please assign this task to me or give me an access to 
self-assign the task? Thanks!

> Support enhanced show databases syntax
> --
>
> Key: FLINK-31481
> URL: https://issues.apache.org/jira/browse/FLINK-31481
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> As FLIP discussed. To avoid bloat, this ticket supports ShowDatabases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33523) DataType ARRAY fails to cast into Object[]

2023-11-23 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789113#comment-17789113
 ] 

Jeyhun Karimov commented on FLINK-33523:


Hi [~prabhujoseph] thanks for reporting the issue. 
The idea is that the type of array (ARRAY) enforces its elements 
to be NOT NULL.
Flink maps converts this to primitive types ( and not Object types) because 
primitive types cannot be null. 

That is why, when you eliminate NOT NULL condition this code snippet works 
(Flink converts this to the Object types - they can be null).



> DataType ARRAY fails to cast into Object[]
> 
>
> Key: FLINK-33523
> URL: https://issues.apache.org/jira/browse/FLINK-33523
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> When upgrading Iceberg's Flink version to 1.18, we found the Flink-related 
> unit test case broken due to this issue. The below code used to work fine in 
> Flink 1.17 but failed after upgrading to 1.18. DataType ARRAY 
> fails to cast into Object[].
> *Error:*
> {code}
> Exception in thread "main" java.lang.ClassCastException: [I cannot be cast to 
> [Ljava.lang.Object;
> at FlinkArrayIntNotNullTest.main(FlinkArrayIntNotNullTest.java:18)
> {code}
> *Repro:*
> {code}
>   import org.apache.flink.table.data.ArrayData;
>   import org.apache.flink.table.data.GenericArrayData;
>   import org.apache.flink.table.api.EnvironmentSettings;
>   import org.apache.flink.table.api.TableEnvironment;
>   import org.apache.flink.table.api.TableResult;
>   public class FlinkArrayIntNotNullTest {
> public static void main(String[] args) throws Exception {
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inBatchMode().build();
>   TableEnvironment env = TableEnvironment.create(settings);
>   env.executeSql("CREATE TABLE filesystemtable2 (id INT, data ARRAY NOT NULL>) WITH ('connector' = 'filesystem', 'path' = 
> '/tmp/FLINK/filesystemtable2', 'format'='json')");
>   env.executeSql("INSERT INTO filesystemtable2 VALUES (4,ARRAY [1,2,3])");
>   TableResult tableResult = env.executeSql("SELECT * from 
> filesystemtable2");
>   ArrayData actualArrayData = new GenericArrayData((Object[]) 
> tableResult.collect().next().getField(1));
> }
>   }
> {code}
> *Analysis:*
> 1. The code works fine with ARRAY datatype. The issue happens when using 
> ARRAY.
> 2. The code works fine when casting into int[] instead of Object[].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0

2023-11-23 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789111#comment-17789111
 ] 

Jeyhun Karimov commented on FLINK-33547:


Hi [~xccui], the mismatch occurs depending on the input type. 
The data created by ARRAY[] SQL function becomes primitive arrays only when the 
ARRAY elements are NOT NULL.
This makes sense since primitives (e.g., float, int) cannot be null. 

That is why you get the described exception.

> SQL primitive array type after upgrading to Flink 1.18.0
> 
>
> Key: FLINK-33547
> URL: https://issues.apache.org/jira/browse/FLINK-33547
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Xingcan Cui
>Priority: Major
>
> We have some Flink SQL UDFs that use object array (Object[]) arguments and 
> take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink 
> 1.18.0, the data created by ARRAY[] SQL function became primitive arrays 
> (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's 
> expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-10-22 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778478#comment-17778478
 ] 

Jeyhun Karimov commented on FLINK-32940:


Hi [~vsowrirajan] [~337361...@qq.com] [~lsy] [~jark], throwing my two cents 
here:

Adding _CoreRules.ProjectCorrelateTransposeRule_ is not enough to solve the 
problem because of several reasons:

* Calcite will add two projections (one for the left and one for the right 
input) [1]. Sometimes some of these projections can be no-op (e.g, without 
expressions). This will cause null reference error in 
_BatchPhysicalCorrelateRule.scala: 67_ 
(_Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))_). That is 
why probably you get this error. 
* However, solving the above issue is probably not enough to get this rule 
working, mainly because how _CoreRules.ProjectCorrelateTransposeRule_ works. 
Basically, this rule pushes down projects, without further handling/correcting 
the references (e.g., LogicalTableFunctionScan will have stale function 
invocation expression - getCall()). As a result, LogicalTableFunctionScan will 
try to access some field, however this field is already projected by Calcite 
rule (there is a LogicalProject operator(s) on top). 
* The above issue will get even complicated, when there are more operators 
(e.g., filters and projections) which has dangling references after Calcite 
rule is applied or many nested fields are accessed (this will result in 
LogicalCorrelate operators nested in each other)


  
About solution, IMO we should either:
# Create a rule that inherits from _CoreRules.ProjectCorrelateTransposeRule_ 
and overrides its _onMatch_ method. We should gracefully handle the downstream 
tree of operators when pushing down projections down to the LogicalCorrelate.
# Alternatively, we can use _CoreRules.ProjectCorrelateTransposeRule_ and our 
own rule to match


{code:java}
+- LogicalCorrelate
   :- LogicalProject
{code}

We cannot force matching LogicalTableFunctionScan or LogicalTableScan because 
dangling references can be in anywhere of the query plan. We need to 1) find 
all RexCall fields of LogicalTableFunctionScan, 2) check if they exists after 
projection pushdown, 3) if not, find to which [new] project expressions they 
correspond, and 4) rewire them. This potentially requires to rewrite 
expressions thought the whole query plan until the leaf node. 

Also, we do not need to merge LogicalProject and LogicalTableScan as part of 
this rule, since other rules will already do it. 

What do you guys think?

[1] 
https://github.com/apache/calcite/blob/c83ac69111fd9e75af5e3615af29a72284667a4a/core/src/main/java/org/apache/calcite/rel/rules/ProjectCorrelateTransposeRule.java#L126

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, 

[jira] [Updated] (FLINK-1509) Streaming Group Reduce Combiner

2016-02-01 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-1509:
--
Assignee: (was: Jeyhun Karimov)

> Streaming Group Reduce Combiner
> ---
>
> Key: FLINK-1509
> URL: https://issues.apache.org/jira/browse/FLINK-1509
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Optimizer
>Reporter: Fabian Hueske
>Priority: Minor
>
> Combiners for GroupReduce operators are currently always (partially) sorting 
> their input even if the data is already appropriately sorted.
> It should be relatively easy to add chained and unchained implementations of 
> a streaming group reduce combiner strategy .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-1509) Streaming Group Reduce Combiner

2016-02-01 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-1509:
--
Comment: was deleted

(was: I will give a try to this problem)

> Streaming Group Reduce Combiner
> ---
>
> Key: FLINK-1509
> URL: https://issues.apache.org/jira/browse/FLINK-1509
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Optimizer
>Reporter: Fabian Hueske
>Assignee: Jeyhun Karimov
>Priority: Minor
>
> Combiners for GroupReduce operators are currently always (partially) sorting 
> their input even if the data is already appropriately sorted.
> It should be relatively easy to add chained and unchained implementations of 
> a streaming group reduce combiner strategy .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2184) Cannot get last element with maxBy/minBy

2016-02-01 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-2184:
--
Assignee: (was: Jeyhun Karimov)

> Cannot get last element with maxBy/minBy
> 
>
> Key: FLINK-2184
> URL: https://issues.apache.org/jira/browse/FLINK-2184
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API, Streaming
>Reporter: Gábor Hermann
>Priority: Minor
>
> In the streaming Scala API there is no method
> {{maxBy(int positionToMaxBy, boolean first)}}
> nor
> {{minBy(int positionToMinBy, boolean first)}}
> like in the Java API, where _first_ set to {{true}} indicates that the latest 
> found element will return.
> These methods should be added to the Scala API too, in order to be consistent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3177) Add Stream Operator that Discards Late Elements

2016-02-01 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-3177:
--
Assignee: (was: Jeyhun Karimov)

> Add Stream Operator that Discards Late Elements
> ---
>
> Key: FLINK-3177
> URL: https://issues.apache.org/jira/browse/FLINK-3177
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> We should introduce an operator (and API call on DataStream) that allows 
> users to discard late elements (with respect to their timestamps and the 
> current watermark). The operator could have two modes, one for silently 
> discarding elements that are late and one for failing the job if late 
> elements arrive (as a sort of sanity check).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2390) Replace iteration timeout with algorithm for detecting termination

2016-02-01 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-2390:
--
Assignee: (was: Jeyhun Karimov)

> Replace iteration timeout with algorithm for detecting termination
> --
>
> Key: FLINK-2390
> URL: https://issues.apache.org/jira/browse/FLINK-2390
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gyula Fora
> Fix For: 1.0.0
>
>
> Currently the user can set a timeout which will shut down the iteration 
> source/sink nodes if no new data is received during that time to allow 
> program termination in iterative streaming jobs.
> This method is used due to the non-trivial nature of termination in iterative 
> streaming jobs. While termination is not a main concern in long running 
> streaming jobs, this behaviour makes iterative tests non-deterministic and 
> they often fail on travis due to the timeout. Also setting a timeout can 
> cause jobs to terminate prematurely.
> I propose to remove iteration timeouts and replace it with the following 
> algorithm for detecting termination:
> -We first identify loop edges in the jobgraph (the channels from the 
> iteration sources to the head operators)
> -Once the head operators (the ones with loop input) finish with all their 
> non-loop inputs they broadcast a marker to their outputs.
> -Each operator will broadcast a marker once it received a marker from all its 
> non-finished inputs
> -Iteration sources are terminated when they receive 2 consecutive markers 
> without receiving any record in-between
> The idea behind the algorithm is to find out when no more outputs are 
> generated from the operators inside an iteration after their normal inputs 
> are finished.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1509) Streaming Group Reduce Combiner

2016-01-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123577#comment-15123577
 ] 

Jeyhun Karimov commented on FLINK-1509:
---

I will give a try to this problem

> Streaming Group Reduce Combiner
> ---
>
> Key: FLINK-1509
> URL: https://issues.apache.org/jira/browse/FLINK-1509
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Optimizer
>Reporter: Fabian Hueske
>Priority: Minor
>
> Combiners for GroupReduce operators are currently always (partially) sorting 
> their input even if the data is already appropriately sorted.
> It should be relatively easy to add chained and unchained implementations of 
> a streaming group reduce combiner strategy .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2184) Cannot get last element with maxBy/minBy

2016-01-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123469#comment-15123469
 ] 

Jeyhun Karimov commented on FLINK-2184:
---

If no one is working on this issue, I can give a try

> Cannot get last element with maxBy/minBy
> 
>
> Key: FLINK-2184
> URL: https://issues.apache.org/jira/browse/FLINK-2184
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API, Streaming
>Reporter: Gábor Hermann
>Priority: Minor
>
> In the streaming Scala API there is no method
> {{maxBy(int positionToMaxBy, boolean first)}}
> nor
> {{minBy(int positionToMinBy, boolean first)}}
> like in the Java API, where _first_ set to {{true}} indicates that the latest 
> found element will return.
> These methods should be added to the Scala API too, in order to be consistent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3089) OperatorState timeout

2016-01-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123391#comment-15123391
 ] 

Jeyhun Karimov commented on FLINK-3089:
---

If nobody works on this issue, I would like to give a try.

> OperatorState timeout
> -
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>Reporter: Niels Basjes
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific OperatorState 
> value which I can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3089) OperatorState timeout

2016-01-29 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-3089:
--
Comment: was deleted

(was: If nobody works on this issue, I would like to give a try.)

> OperatorState timeout
> -
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>Reporter: Niels Basjes
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific OperatorState 
> value which I can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-1509) Streaming Group Reduce Combiner

2016-01-29 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned FLINK-1509:
-

Assignee: Jeyhun Karimov

> Streaming Group Reduce Combiner
> ---
>
> Key: FLINK-1509
> URL: https://issues.apache.org/jira/browse/FLINK-1509
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Optimizer
>Reporter: Fabian Hueske
>Assignee: Jeyhun Karimov
>Priority: Minor
>
> Combiners for GroupReduce operators are currently always (partially) sorting 
> their input even if the data is already appropriately sorted.
> It should be relatively easy to add chained and unchained implementations of 
> a streaming group reduce combiner strategy .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2390) Replace iteration timeout with algorithm for detecting termination

2016-01-29 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned FLINK-2390:
-

Assignee: Jeyhun Karimov

> Replace iteration timeout with algorithm for detecting termination
> --
>
> Key: FLINK-2390
> URL: https://issues.apache.org/jira/browse/FLINK-2390
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gyula Fora
>Assignee: Jeyhun Karimov
> Fix For: 1.0.0
>
>
> Currently the user can set a timeout which will shut down the iteration 
> source/sink nodes if no new data is received during that time to allow 
> program termination in iterative streaming jobs.
> This method is used due to the non-trivial nature of termination in iterative 
> streaming jobs. While termination is not a main concern in long running 
> streaming jobs, this behaviour makes iterative tests non-deterministic and 
> they often fail on travis due to the timeout. Also setting a timeout can 
> cause jobs to terminate prematurely.
> I propose to remove iteration timeouts and replace it with the following 
> algorithm for detecting termination:
> -We first identify loop edges in the jobgraph (the channels from the 
> iteration sources to the head operators)
> -Once the head operators (the ones with loop input) finish with all their 
> non-loop inputs they broadcast a marker to their outputs.
> -Each operator will broadcast a marker once it received a marker from all its 
> non-finished inputs
> -Iteration sources are terminated when they receive 2 consecutive markers 
> without receiving any record in-between
> The idea behind the algorithm is to find out when no more outputs are 
> generated from the operators inside an iteration after their normal inputs 
> are finished.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)