[jira] [Commented] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect

2024-06-19 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856151#comment-17856151
 ] 

Martijn Visser commented on FLINK-35631:


[~renqs] Given that you worked on the FLIP, can you give your thoughts on this 
one?

> KafkaSource parameter partition.discovery.interval.ms with a default value of 
> 5 minutes does not take effect
> 
>
> Key: FLINK-35631
> URL: https://issues.apache.org/jira/browse/FLINK-35631
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: elon_X
>Priority: Major
>
> When I start a stream program to consume Kafka 
> (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically 
> detect new partitions after Kafka adds partitions.
>  
> *Reason*
> In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has 
> been overridden. Since I did not set this parameter, even though it is 
> {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets 
> {{{}partition.discovery.interval.ms = -1{}}}.
> In the {{{}KafkaSourceEnumerator{}}}, the value of 
> {{partition.discovery.interval.ms}} is then -1, instead of the default value 
> of 5 minutes, so automatic partition discovery does not work, and the default 
> value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless.
>  
> A possible solution is to set {{partition.discovery.interval.ms = -1}} only 
> if {{boundedness == Boundedness.BOUNDED}} is true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35629) Performance regression in stringRead and stringWrite

2024-06-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856172#comment-17856172
 ] 

Piotr Nowojski commented on FLINK-35629:


It's strange, why JMH upgrade is causing this? Isn't this yet another case that 
JIT is going crazy due to some unrelated change? 

Anyway +1 for just accepting it.

> Performance regression in stringRead and stringWrite
> 
>
> Key: FLINK-35629
> URL: https://issues.apache.org/jira/browse/FLINK-35629
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
> Fix For: 1.20.0
>
> Attachments: image-2024-06-18-14-52-55-164.png
>
>
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.ascii&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.russian&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.ascii&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.russian&extr=on&quarts=on&equid=off&env=3&revs=200]
> !image-2024-06-18-14-52-55-164.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35619) Window rank query fails with "must call validate first"

2024-06-19 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-35619.

Resolution: Fixed

Fixed in e91d4a16305ddf7d4e34f45c9eec9c0af454a1d4

> Window rank query fails with "must call validate first"
> ---
>
> Key: FLINK-35619
> URL: https://issues.apache.org/jira/browse/FLINK-35619
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> A program:
> {code}
> static final TableTestProgram WINDOW_RANK_HOP_TVF_NAMED_MIN_TOP_1 =
> TableTestProgram.of(
> "window-rank-hop-tvf-named-min-top-n",
> "validates window min top-n follows after hop 
> window")
> .setupTableSource(SourceTestStep.newBuilder("bid_t")
> .addSchema(
> "ts STRING",
> "price DECIMAL(10,2)",
> "supplier_id STRING",
> "`bid_time` AS TO_TIMESTAMP(`ts`)",
> "WATERMARK for `bid_time` AS `bid_time` - 
> INTERVAL '1' SECOND")
> .producedValues(
> Row.of(
> "2020-04-15 08:00:05",
> new BigDecimal(4.00),
> "supplier1"))
> .build())
> .setupTableSink(
> SinkTestStep.newBuilder("sink_t")
> .addSchema("bid_time TIMESTAMP(3)", 
> "supplier_id STRING")
> .consumedValues(
> "+I[2020-04-15T08:00:05, 
> supplier1]",
> "+I[2020-04-15T08:00:05, 
> supplier1]")
> .build())
> .runSql("INSERT INTO sink_t(bid_time, supplier_id) "
> + "SELECT bid_time, supplier_id\n"
> + "  FROM (\n"
> + "SELECT\n"
> + " bid_time,\n"
> + " supplier_id,\n"
> + " ROW_NUMBER() OVER (PARTITION BY 
> window_start, window_end ORDER BY price ASC) AS row_num\n"
> + "FROM TABLE(HOP(\n"
> + "  DATA => TABLE bid_t,\n"
> + "  TIMECOL => DESCRIPTOR(`bid_time`),\n"
> + "  SLIDE => INTERVAL '5' SECOND,\n"
> + "  SIZE => INTERVAL '10' SECOND))\n"
> + "  ) WHERE row_num <= 3")
> .build();
> {code}
> fails with:
> {code}
> java.lang.AssertionError: must call validate first
>   at 
> org.apache.calcite.sql.validate.IdentifierNamespace.resolve(IdentifierNamespace.java:256)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2871)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2464)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2378)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2323)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:730)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:716)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3880)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryOrInList(SqlToRelConverter.java:1912)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertExists(SqlToRelConverter.java:1895)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.substituteSubQuery(SqlToRelConverter.java:1421)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.replaceSubQueries(SqlToRelConverter.java:1161)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertCollectionTable(SqlToRelConverter.java:2928)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2511)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2378)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConve

[jira] [Assigned] (FLINK-35648) Pipeline job doesn't support multiple routing

2024-06-19 Thread Ruan Hang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruan Hang reassigned FLINK-35648:
-

Assignee: yux

> Pipeline job doesn't support multiple routing
> -
>
> Key: FLINK-35648
> URL: https://issues.apache.org/jira/browse/FLINK-35648
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Major
>
> Currently, any upstream could be routed at most once, which means if we wrote 
> such route  definition:
> routes:
>   - source-table: db.(A|B)
>      sink-table: terminal.one
>   - source0table: db.(B|C)
>      sink-table: terminal.two
> Any upstream schema / data changes from db.B will be sent to terminal.one 
> {*}only{*}, not to terminal.two since it has been handled by the first route 
> rule.
> This ticket suggests adding a route behavior option (FIRST_MATCH / COMPLETE) 
> to configure if all route rules should be applied or only the first matched 
> rule (for backwards compatibility.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35648) Pipeline job doesn't support multiple routing

2024-06-19 Thread Ruan Hang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856216#comment-17856216
 ] 

Ruan Hang commented on FLINK-35648:
---

[~xiqian_yu] This is an important improvement. Thanks for working on it.

Assign it to you.

> Pipeline job doesn't support multiple routing
> -
>
> Key: FLINK-35648
> URL: https://issues.apache.org/jira/browse/FLINK-35648
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Major
>
> Currently, any upstream could be routed at most once, which means if we wrote 
> such route  definition:
> routes:
>   - source-table: db.(A|B)
>      sink-table: terminal.one
>   - source0table: db.(B|C)
>      sink-table: terminal.two
> Any upstream schema / data changes from db.B will be sent to terminal.one 
> {*}only{*}, not to terminal.two since it has been handled by the first route 
> rule.
> This ticket suggests adding a route behavior option (FIRST_MATCH / COMPLETE) 
> to configure if all route rules should be applied or only the first matched 
> rule (for backwards compatibility.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35636) Streaming File Sink s3 end-to-end test did not finish after 900 seconds

2024-06-19 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856222#comment-17856222
 ] 

Weijie Guo commented on FLINK-35636:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60360&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=9598

> Streaming File Sink s3 end-to-end test did not finish after 900 seconds
> ---
>
> Key: FLINK-35636
> URL: https://issues.apache.org/jira/browse/FLINK-35636
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.17.2
>Reporter: Weijie Guo
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60335&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=4916



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35649) Bump Flink version to 1.19.1 in k8s-operator

2024-06-19 Thread Ferenc Csaky (Jira)
Ferenc Csaky created FLINK-35649:


 Summary: Bump Flink version to 1.19.1 in k8s-operator
 Key: FLINK-35649
 URL: https://issues.apache.org/jira/browse/FLINK-35649
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Ferenc Csaky
 Fix For: kubernetes-operator-1.10.0


In FLINK-28915 it came up the the operator is not able to utilize the artifact 
fetching capabilities that was introduced in Flink 1.19 until it is not built 
on that version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager

2024-06-19 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856230#comment-17856230
 ] 

Matthias Pohl commented on FLINK-35639:
---

[~chesnay] pointed me to the actual issue because I was wondering why the 
change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The 
problem is that you're actually not following the supported process (as 
documented in the [Flink 
docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]).
 That results in incompatibilities of internal APIs (the constructor in 
question is package-private). Please use savepoints to migrate jobs. There are 
other internal APIs (the JobGraph itself isn't a stable API, either) that might 
cause problems in your upgrade process.
 # Create a savepoint of the job in the old version.
 # Start the Flink cluster with the upgraded Flink version.
 # Submit the job using the created savepoint to restart the job.

> upgrade to 1.19 with job in HA state with restart strategy crashes job manager
> --
>
> Key: FLINK-35639
> URL: https://issues.apache.org/jira/browse/FLINK-35639
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.20.0, 1.19.1
> Environment: Download 1.18 and 1.19 binary releases. Add the 
> following to flink-1.19.0/conf/config.yaml and 
> flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost high-availability.storageDir: 
> file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host 
> zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh 
> start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh 
> start-foreground launch the following job: ```java import 
> org.apache.flink.api.java.ExecutionEnvironment; import 
> org.apache.flink.api.java.tuple.Tuple2; import 
> org.apache.flink.api.common.functions.FlatMapFunction; import 
> org.apache.flink.util.Collector; import 
> org.apache.flink.api.common.restartstrategy.RestartStrategies; import 
> org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; 
> public class FlinkJob \{ public static void main(String[] args) throws 
> Exception { final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( 
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, 
> TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") 
> .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static 
> final class LineSplitter implements FlatMapFunction> \{ @Override public void 
> flatMap(String value, Collector> out) { for (String word : value.split(" ")) 
> { try { Thread.sleep(12); } catch (InterruptedException e) \{ 
> e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 
> 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink 
> flink-java ${flink.version} org.apache.flink flink-streaming-java 
> ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 
> ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 
> 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run 
> ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with 
> JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. 
> Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh 
> start-foreground Root cause == It looks like the type of 
> delayBetweenAttemptsInterval was changed in 1.19 
> https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
>  , introducing an incompatibility which is not handled by flink 1.19. In my 
> opinion, job-maanger should not crash when starting in that case. 
>Reporter: yazgoo
>Assignee: Matthias Pohl
>Priority: Blocker
>  Labels: pull-request-available
>
> When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in 
> zookeeper HA state, I have a jobmanager crash with a ClassCastException, see 
> log below  
>  
> {code:java}
> 2024-06-18 16:58:14,401 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: 
> JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed.     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 

[jira] [Comment Edited] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager

2024-06-19 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856230#comment-17856230
 ] 

Matthias Pohl edited comment on FLINK-35639 at 6/19/24 9:47 AM:


[~chesnay] pointed me to the actual issue because I was wondering why the 
change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The 
problem is that you're actually not following the supported process (as 
documented in the [Flink 
docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]).
 That results in incompatibilities of internal APIs (the constructor in 
question is package-private). Please use savepoints to migrate jobs. There are 
other internal APIs (the JobGraph itself isn't a stable API, either) that might 
cause problems in your upgrade process.
 # Create a savepoint of the job in the old version.
 # Start the Flink cluster with the upgraded Flink version.
 # Submit the job using the created savepoint to restart the job using the job 
client of the new Flink binaries (to allow for proper JobGraph creation).


was (Author: mapohl):
[~chesnay] pointed me to the actual issue because I was wondering why the 
change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The 
problem is that you're actually not following the supported process (as 
documented in the [Flink 
docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]).
 That results in incompatibilities of internal APIs (the constructor in 
question is package-private). Please use savepoints to migrate jobs. There are 
other internal APIs (the JobGraph itself isn't a stable API, either) that might 
cause problems in your upgrade process.
 # Create a savepoint of the job in the old version.
 # Start the Flink cluster with the upgraded Flink version.
 # Submit the job using the created savepoint to restart the job.

> upgrade to 1.19 with job in HA state with restart strategy crashes job manager
> --
>
> Key: FLINK-35639
> URL: https://issues.apache.org/jira/browse/FLINK-35639
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.20.0, 1.19.1
> Environment: Download 1.18 and 1.19 binary releases. Add the 
> following to flink-1.19.0/conf/config.yaml and 
> flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost high-availability.storageDir: 
> file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host 
> zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh 
> start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh 
> start-foreground launch the following job: ```java import 
> org.apache.flink.api.java.ExecutionEnvironment; import 
> org.apache.flink.api.java.tuple.Tuple2; import 
> org.apache.flink.api.common.functions.FlatMapFunction; import 
> org.apache.flink.util.Collector; import 
> org.apache.flink.api.common.restartstrategy.RestartStrategies; import 
> org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; 
> public class FlinkJob \{ public static void main(String[] args) throws 
> Exception { final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( 
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, 
> TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") 
> .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static 
> final class LineSplitter implements FlatMapFunction> \{ @Override public void 
> flatMap(String value, Collector> out) { for (String word : value.split(" ")) 
> { try { Thread.sleep(12); } catch (InterruptedException e) \{ 
> e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 
> 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink 
> flink-java ${flink.version} org.apache.flink flink-streaming-java 
> ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 
> ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 
> 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run 
> ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with 
> JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. 
> Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh 
> start-foreground Root cause == It looks like the type of 
> delayBetweenAttemptsInterval was changed in 1.19 
> https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
>  , introducing an incompat

[jira] [Closed] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager

2024-06-19 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl closed FLINK-35639.
-
Resolution: Not A Problem

I'm closing the issue and the related PRs because we're actually not supporting 
this kind of version upgrades in general. Fixing the {{RestartStrategy}} issue 
wouldn't necessarily solve the issue.

> upgrade to 1.19 with job in HA state with restart strategy crashes job manager
> --
>
> Key: FLINK-35639
> URL: https://issues.apache.org/jira/browse/FLINK-35639
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.20.0, 1.19.1
> Environment: Download 1.18 and 1.19 binary releases. Add the 
> following to flink-1.19.0/conf/config.yaml and 
> flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost high-availability.storageDir: 
> file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host 
> zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh 
> start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh 
> start-foreground launch the following job: ```java import 
> org.apache.flink.api.java.ExecutionEnvironment; import 
> org.apache.flink.api.java.tuple.Tuple2; import 
> org.apache.flink.api.common.functions.FlatMapFunction; import 
> org.apache.flink.util.Collector; import 
> org.apache.flink.api.common.restartstrategy.RestartStrategies; import 
> org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; 
> public class FlinkJob \{ public static void main(String[] args) throws 
> Exception { final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( 
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, 
> TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") 
> .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static 
> final class LineSplitter implements FlatMapFunction> \{ @Override public void 
> flatMap(String value, Collector> out) { for (String word : value.split(" ")) 
> { try { Thread.sleep(12); } catch (InterruptedException e) \{ 
> e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 
> 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink 
> flink-java ${flink.version} org.apache.flink flink-streaming-java 
> ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 
> ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 
> 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run 
> ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with 
> JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. 
> Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh 
> start-foreground Root cause == It looks like the type of 
> delayBetweenAttemptsInterval was changed in 1.19 
> https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
>  , introducing an incompatibility which is not handled by flink 1.19. In my 
> opinion, job-maanger should not crash when starting in that case. 
>Reporter: yazgoo
>Assignee: Matthias Pohl
>Priority: Blocker
>  Labels: pull-request-available
>
> When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in 
> zookeeper HA state, I have a jobmanager crash with a ClassCastException, see 
> log below  
>  
> {code:java}
> 2024-06-18 16:58:14,401 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: 
> JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed.     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) 
> ~[?:?]     at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
>  ~[?:?]     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(P

[jira] [Updated] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager

2024-06-19 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-35639:
--
Priority: Major  (was: Blocker)

> upgrade to 1.19 with job in HA state with restart strategy crashes job manager
> --
>
> Key: FLINK-35639
> URL: https://issues.apache.org/jira/browse/FLINK-35639
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.20.0, 1.19.1
> Environment: Download 1.18 and 1.19 binary releases. Add the 
> following to flink-1.19.0/conf/config.yaml and 
> flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost high-availability.storageDir: 
> file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host 
> zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh 
> start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh 
> start-foreground launch the following job: ```java import 
> org.apache.flink.api.java.ExecutionEnvironment; import 
> org.apache.flink.api.java.tuple.Tuple2; import 
> org.apache.flink.api.common.functions.FlatMapFunction; import 
> org.apache.flink.util.Collector; import 
> org.apache.flink.api.common.restartstrategy.RestartStrategies; import 
> org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; 
> public class FlinkJob \{ public static void main(String[] args) throws 
> Exception { final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( 
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, 
> TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") 
> .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static 
> final class LineSplitter implements FlatMapFunction> \{ @Override public void 
> flatMap(String value, Collector> out) { for (String word : value.split(" ")) 
> { try { Thread.sleep(12); } catch (InterruptedException e) \{ 
> e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 
> 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink 
> flink-java ${flink.version} org.apache.flink flink-streaming-java 
> ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 
> ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 
> 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run 
> ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with 
> JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. 
> Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh 
> start-foreground Root cause == It looks like the type of 
> delayBetweenAttemptsInterval was changed in 1.19 
> https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
>  , introducing an incompatibility which is not handled by flink 1.19. In my 
> opinion, job-maanger should not crash when starting in that case. 
>Reporter: yazgoo
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
>
> When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in 
> zookeeper HA state, I have a jobmanager crash with a ClassCastException, see 
> log below  
>  
> {code:java}
> 2024-06-18 16:58:14,401 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: 
> JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed.     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) 
> ~[?:?]     at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
>  ~[?:?]     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
>  ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingU

[jira] [Comment Edited] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager

2024-06-19 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856230#comment-17856230
 ] 

Matthias Pohl edited comment on FLINK-35639 at 6/19/24 9:53 AM:


[~chesnay] pointed me to the actual issue. I was initially wondering why the 
change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The 
problem is that you're actually not following the supported process (as 
documented in the [Flink 
docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]).
 That results in incompatibilities of internal APIs (the constructor in 
question is package-private). Please use savepoints to migrate jobs. There are 
other internal APIs (the JobGraph itself isn't a stable API, either) that might 
cause problems in your upgrade process.
 # Create a savepoint of the job in the old version.
 # Start the Flink cluster with the upgraded Flink version.
 # Submit the job using the created savepoint to restart the job using the job 
client of the new Flink binaries (to allow for proper JobGraph creation).


was (Author: mapohl):
[~chesnay] pointed me to the actual issue because I was wondering why the 
change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The 
problem is that you're actually not following the supported process (as 
documented in the [Flink 
docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]).
 That results in incompatibilities of internal APIs (the constructor in 
question is package-private). Please use savepoints to migrate jobs. There are 
other internal APIs (the JobGraph itself isn't a stable API, either) that might 
cause problems in your upgrade process.
 # Create a savepoint of the job in the old version.
 # Start the Flink cluster with the upgraded Flink version.
 # Submit the job using the created savepoint to restart the job using the job 
client of the new Flink binaries (to allow for proper JobGraph creation).

> upgrade to 1.19 with job in HA state with restart strategy crashes job manager
> --
>
> Key: FLINK-35639
> URL: https://issues.apache.org/jira/browse/FLINK-35639
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.20.0, 1.19.1
> Environment: Download 1.18 and 1.19 binary releases. Add the 
> following to flink-1.19.0/conf/config.yaml and 
> flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost high-availability.storageDir: 
> file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host 
> zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh 
> start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh 
> start-foreground launch the following job: ```java import 
> org.apache.flink.api.java.ExecutionEnvironment; import 
> org.apache.flink.api.java.tuple.Tuple2; import 
> org.apache.flink.api.common.functions.FlatMapFunction; import 
> org.apache.flink.util.Collector; import 
> org.apache.flink.api.common.restartstrategy.RestartStrategies; import 
> org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; 
> public class FlinkJob \{ public static void main(String[] args) throws 
> Exception { final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( 
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, 
> TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") 
> .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static 
> final class LineSplitter implements FlatMapFunction> \{ @Override public void 
> flatMap(String value, Collector> out) { for (String word : value.split(" ")) 
> { try { Thread.sleep(12); } catch (InterruptedException e) \{ 
> e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 
> 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink 
> flink-java ${flink.version} org.apache.flink flink-streaming-java 
> ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 
> ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 
> 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run 
> ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with 
> JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. 
> Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh 
> start-foreground Root cause == It looks like the type of 
> delayBetweenAttemptsInterval was changed in 1.19 
> https://github.com/apache/flink/pull/22984/files#diff-d174f3

[jira] [Closed] (FLINK-35629) Performance regression in stringRead and stringWrite

2024-06-19 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan closed FLINK-35629.
---
Resolution: Won't Fix

> Performance regression in stringRead and stringWrite
> 
>
> Key: FLINK-35629
> URL: https://issues.apache.org/jira/browse/FLINK-35629
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
> Fix For: 1.20.0
>
> Attachments: image-2024-06-18-14-52-55-164.png
>
>
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.ascii&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.russian&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.ascii&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.russian&extr=on&quarts=on&equid=off&env=3&revs=200]
> !image-2024-06-18-14-52-55-164.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-16784) Support KeyedBroadcastProcessFunction state bootstrapping.

2024-06-19 Thread Alexander Fedulov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Fedulov reassigned FLINK-16784:
-

Assignee: Or Keren

> Support KeyedBroadcastProcessFunction state bootstrapping. 
> ---
>
> Key: FLINK-16784
> URL: https://issues.apache.org/jira/browse/FLINK-16784
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: Seth Wiesman
>Assignee: Or Keren
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25646) Document buffer debloating issues with high parallelism

2024-06-19 Thread Jufang He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856252#comment-17856252
 ] 

Jufang He edited comment on FLINK-25646 at 6/19/24 11:55 AM:
-

[~pnowojski] I would like to ask if there is any progress on this issue? 
I also saw significant performance degradation when testing buffer debloating 
(with Unaligned Checkpoint enabled).

The following is some information about my test jobs. The test kafka source QPS 
is high, test jobs are all under backpressure and have lag. With buffer 
debloating  enabled, the buffer size is smaller, the number of segments used is 
increased, and the total amount of in-flight data is significantly decreased, 
but the negative effect is also obvious, the throughput of the task is 
decreased by more than 30%.
|| ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*||
|parallelism|1350|1350|
|checkpoint duration(avg)|2m 43s|2m 31s|
|channel state size(avg)|65.9 GB|5.43 GB|
|memory segment size|32K(default value)|256~5k (calculated by buffer 
debloating, most of them are 256)|
|total memory segments per TM|115k|115k|
|available memory segments per TM|63.2k|19.5k|
|floating buffers per gate|8(default value)|2000|
|*throughput(avg)*|374k|249k|

 


was (Author: JIRAUSER302059):
[~pnowojski] I would like to ask if there is any progress on this issue? 
I also saw significant performance degradation when testing buffer debloating 
(with Unaligned Checkpoint enabled).

The following is some information about my test jobs. The test kafka source QPS 
is high, test jobs are all under backpressure and have lag. With buffer 
debloating  enabled, the buffer size is smaller, the number of segments used is 
increased, and the total amount of in-flight data is significantly decreased, 
but the negative effect is also obvious, the throughput of the task is 
decreased by more than 30%.
|| ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*||
|parallelism|1350|1350|
|checkpoint duration(avg)|2m 43s|2m 31s|
|channel state size(avg)|65.9 GB|5.43 GB|
|memory segment size|32K(default value)|256~5k (calculated by buffer 
debloating, most of them are 256)|
|total memory segments per TM|115k|115k|
|available memory segments per TM|63.2k|19.5k|
|floating buffers per gate|8(default value)|2000|
|*throughput(avg)*|374k|249k|

 

 

> Document buffer debloating issues with high parallelism
> ---
>
> Key: FLINK-25646
> URL: https://issues.apache.org/jira/browse/FLINK-25646
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> According to last benchmarks, there are some problems with buffer debloat 
> when job has high parallelism. The high parallelism means the different value 
> from job to job but in general it is more than 200. So it makes sense to 
> document that problem and propose the solution - increasing the number of 
> buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25646) Document buffer debloating issues with high parallelism

2024-06-19 Thread Jufang He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856252#comment-17856252
 ] 

Jufang He commented on FLINK-25646:
---

[~pnowojski] I would like to ask if there is any progress on this issue? 
I also saw significant performance degradation when testing buffer debloating 
(with Unaligned Checkpoint enabled).

The following is some information about my test jobs. The test kafka source QPS 
is high, test jobs are all under backpressure and have lag. With buffer 
debloating  enabled, the buffer size is smaller, the number of segments used is 
increased, and the total amount of in-flight data is significantly decreased, 
but the negative effect is also obvious, the throughput of the task is 
decreased by more than 30%.
|| ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*||
|parallelism|1350|1350|
|checkpoint duration(avg)|2m 43s|2m 31s|
|channel state size(avg)|65.9 GB|5.43 GB|
|memory segment size|32K(default value)|256~5k (calculated by buffer 
debloating, most of them are 256)|
|total memory segments per TM|115k|115k|
|available memory segments per TM|63.2k|19.5k|
|floating buffers per gate|8(default value)|2000|
|*throughput(avg)*|374k|249k|

 

 

> Document buffer debloating issues with high parallelism
> ---
>
> Key: FLINK-25646
> URL: https://issues.apache.org/jira/browse/FLINK-25646
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> According to last benchmarks, there are some problems with buffer debloat 
> when job has high parallelism. The high parallelism means the different value 
> from job to job but in general it is more than 200. So it makes sense to 
> document that problem and propose the solution - increasing the number of 
> buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25646) Document buffer debloating issues with high parallelism

2024-06-19 Thread Jufang He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856252#comment-17856252
 ] 

Jufang He edited comment on FLINK-25646 at 6/19/24 11:56 AM:
-

[~pnowojski] I would like to ask if there is any progress on this 
issue(FLINK-25688)? 
I also saw significant performance degradation when testing buffer debloating 
(with Unaligned Checkpoint enabled).

The following is some information about my test jobs. The test kafka source QPS 
is high, test jobs are all under backpressure and have lag. With buffer 
debloating  enabled, the buffer size is smaller, the number of segments used is 
increased, and the total amount of in-flight data is significantly decreased, 
but the negative effect is also obvious, the throughput of the task is 
decreased by more than 30%.
|| ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*||
|parallelism|1350|1350|
|checkpoint duration(avg)|2m 43s|2m 31s|
|channel state size(avg)|65.9 GB|5.43 GB|
|memory segment size|32K(default value)|256~5k (calculated by buffer 
debloating, most of them are 256)|
|total memory segments per TM|115k|115k|
|available memory segments per TM|63.2k|19.5k|
|floating buffers per gate|8(default value)|2000|
|*throughput(avg)*|374k|249k|

 


was (Author: JIRAUSER302059):
[~pnowojski] I would like to ask if there is any progress on this issue? 
I also saw significant performance degradation when testing buffer debloating 
(with Unaligned Checkpoint enabled).

The following is some information about my test jobs. The test kafka source QPS 
is high, test jobs are all under backpressure and have lag. With buffer 
debloating  enabled, the buffer size is smaller, the number of segments used is 
increased, and the total amount of in-flight data is significantly decreased, 
but the negative effect is also obvious, the throughput of the task is 
decreased by more than 30%.
|| ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*||
|parallelism|1350|1350|
|checkpoint duration(avg)|2m 43s|2m 31s|
|channel state size(avg)|65.9 GB|5.43 GB|
|memory segment size|32K(default value)|256~5k (calculated by buffer 
debloating, most of them are 256)|
|total memory segments per TM|115k|115k|
|available memory segments per TM|63.2k|19.5k|
|floating buffers per gate|8(default value)|2000|
|*throughput(avg)*|374k|249k|

 

> Document buffer debloating issues with high parallelism
> ---
>
> Key: FLINK-25646
> URL: https://issues.apache.org/jira/browse/FLINK-25646
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> According to last benchmarks, there are some problems with buffer debloat 
> when job has high parallelism. The high parallelism means the different value 
> from job to job but in general it is more than 200. So it makes sense to 
> document that problem and propose the solution - increasing the number of 
> buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25646) Document buffer debloating issues with high parallelism

2024-06-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856273#comment-17856273
 ] 

Piotr Nowojski commented on FLINK-25646:


Hi [~hejufang001]. No, there have been no progress on this issue.

> floating buffers per gate 8(default value)2000

does it mean for buffer debloating enabled, you have increased floating buffers 
from 8 to 2000?

Also, have you tried setting higher debloating target? AFAIR we have observed 
that increasing the target from 1s (default) to 5s seems to behave much better 
where it comes to the max throughput.

> Document buffer debloating issues with high parallelism
> ---
>
> Key: FLINK-25646
> URL: https://issues.apache.org/jira/browse/FLINK-25646
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> According to last benchmarks, there are some problems with buffer debloat 
> when job has high parallelism. The high parallelism means the different value 
> from job to job but in general it is more than 200. So it makes sense to 
> document that problem and propose the solution - increasing the number of 
> buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35553) Integrate newly added trigger interface with checkpointing

2024-06-19 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-35553:
-

Assignee: Matthias Pohl

> Integrate newly added trigger interface with checkpointing
> --
>
> Key: FLINK-35553
> URL: https://issues.apache.org/jira/browse/FLINK-35553
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
>
> This connects the newly introduced trigger logic (FLINK-35551) with the 
> {{CheckpointStatsTracker}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35552) Move CheckpointStatsTracker out of ExecutionGraph into Scheduler

2024-06-19 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-35552:
-

Assignee: Matthias Pohl

> Move CheckpointStatsTracker out of ExecutionGraph into Scheduler
> 
>
> Key: FLINK-35552
> URL: https://issues.apache.org/jira/browse/FLINK-35552
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
>
> The scheduler needs to know about the CheckpointStatsTracker to allow 
> listening to checkpoint failures and completion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35551) Introduces RescaleManager#onTrigger endpoint

2024-06-19 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-35551:
-

Assignee: Matthias Pohl

> Introduces RescaleManager#onTrigger endpoint
> 
>
> Key: FLINK-35551
> URL: https://issues.apache.org/jira/browse/FLINK-35551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
>
> The new endpoint would allow use from separating observing change events from 
> actually triggering the rescale operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka

2024-06-19 Thread Muhammet Orazov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856294#comment-17856294
 ] 

Muhammet Orazov commented on FLINK-25538:
-

Hey all,

I have create new PR [https://github.com/apache/flink-connector-kafka/pull/106] 
(continuing from PR [https://github.com/apache/flink-connector-kafka/pull/66).]

Please feel free to close the PR #66.

I'll continue applying changes on #106.

 

 

> [JUnit5 Migration] Module: flink-connector-kafka
> 
>
> Key: FLINK-25538
> URL: https://issues.apache.org/jira/browse/FLINK-25538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Muhammet Orazov
>Priority: Minor
>  Labels: pull-request-available, stale-assigned, starter
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)
Andrey Gaskov created FLINK-35650:
-

 Summary: Incorrect TIMESTAMP_LTZ type behavior in Table SQL
 Key: FLINK-35650
 URL: https://issues.apache.org/jira/browse/FLINK-35650
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Client, Table SQL / Runtime
Affects Versions: 1.18.1, 1.17.2, 1.20.0
 Environment: Local environment, Open Source Flink without 
modifications, the cluster started by ./bin/start-cluster.sh
Reporter: Andrey Gaskov


The file named /home/miron/tmp/data.csv contains a single line:

 
{code:java}
"1970-01-01 00:00:00Z" {code}
Run the following commands in Flink SQL client:

 

 
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.

Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.

Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:

 
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:

 
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:

 
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:

 
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got wrong time.

 

The same behavior we get in Java. The result is an object of Instance class 
with wrong value. Also, in Java I got more wrong cases that could not be 
reproduced using SQL Client.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Gaskov updated FLINK-35650:
--
Description: 
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.

Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.

Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got wrong time.

 

The same behavior we get in Java. The result is an object of Instance class 
with wrong value. Also, in Java I got more wrong cases that could not be 
reproduced using SQL Client.

  was:
The file named /home/miron/tmp/data.csv contains a single line:

 
{code:java}
"1970-01-01 00:00:00Z" {code}
Run the following commands in Flink SQL client:

 

 
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.

Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.

Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:

 
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:

 
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:

 
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:

 
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
+-

[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Gaskov updated FLINK-35650:
--
Description: 
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got wrong time.

 

The same behavior we get in Java. The result is an object of Instant class with 
wrong value. Also, in Java I got more wrong cases that could not be reproduced 
using SQL Client.

  was:
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.

Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.

Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
+

[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Gaskov updated FLINK-35650:
--
Description: 
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.

Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.

Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got wrong time.

 

The same behavior we get in Java. The result is an object of Instant class with 
wrong value. Also, in Java I got more wrong cases that could not be reproduced 
using SQL Client.

  was:
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.

Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.

Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
+

[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Gaskov updated FLINK-35650:
--
Description: 
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds to 1970-01-01 
00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time 
zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got wrong time.

 

The same behavior we get in Java. The result is an object of Instant class with 
wrong value. Also, in Java I got more wrong cases that could not be reproduced 
using SQL Client.

  was:
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds 1970-01-01 
00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at 
Asia/Shanghai time zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|      

[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Gaskov updated FLINK-35650:
--
Description: 
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds to 1970-01-01 
00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time 
zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 argument to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got wrong time.

 

The same behavior we get in Java. The result is an object of Instant class with 
wrong value. Also, in Java I got more wrong cases that could not be reproduced 
using SQL Client.

  was:
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds to 1970-01-01 
00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time 
zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 parameter to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                

[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Gaskov updated FLINK-35650:
--
Description: 
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds to 1970-01-01 
00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time 
zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 argument to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got the wrong time.

 

The same behavior we get in Java. The result is an object of Instant class with 
wrong value. Also, in Java I got more wrong cases that could not be reproduced 
using SQL Client.

  was:
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds to 1970-01-01 
00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time 
zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 argument to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|             

[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Andrey Gaskov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Gaskov updated FLINK-35650:
--
Description: 
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds to 1970-01-01 
00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time 
zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 argument to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 16:00:00.00 |
++
1 row in set (0.37 seconds) {code}
*{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
function from "<=" to "=" in the where clause we got the wrong time ({*}16:00 
instead of 08:00{*}).

 

The same behavior we get in Java. The result is an object of Instant class with 
wrong value. Also, in Java I got more wrong cases that could not be reproduced 
using SQL Client.

  was:
The file named /home/miron/tmp/data.csv contains a single line:
{code:java}
"1970-01-01 00:00:00Z" {code}
 

Run the following commands in Flink SQL client:
{code:java}
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeeded.
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeeded.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeeded.
Flink SQL> 
> create table t_in (
>   t timestamp_ltz
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/home/miron/tmp/data.csv',
>   'format' = 'csv'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from t_in;
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (1.33 seconds)
{code}
So far so good. The behavior corresponds to the specification.

 

Run the following query:
{code:java}
Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
+-+
|                  EXPR$0 |
+-+
| 1970-01-01 08:00:00.000 |
+-+
1 row in set (0.36 seconds)
{code}
This is also correct. Zero point on the timeline corresponds to 1970-01-01 
00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time 
zone.

 

Now things get worse:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
Empty set (0.47 seconds) {code}
*{color:#de350b}This is wrong.{color}* We should get the record as a result.

 

We could fix it the following way:
{code:java}
Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
++
|                          t |
++
| 1970-01-01 08:00:00.00 |
++
1 row in set (0.37 seconds) {code}
Even though we got the record, we should not specify 8*60*60 argument to 
TO_TIMESTAMP_LTZ.

 

But the most ridiculous result is the following:
{code:java}
Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
+

[jira] [Commented] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL

2024-06-19 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856316#comment-17856316
 ] 

Dawid Wysakowicz commented on FLINK-35650:
--

I think it is most likely related to 
https://issues.apache.org/jira/browse/FLINK-34938

> Incorrect TIMESTAMP_LTZ type behavior in Table SQL
> --
>
> Key: FLINK-35650
> URL: https://issues.apache.org/jira/browse/FLINK-35650
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Client, Table SQL / Runtime
>Affects Versions: 1.17.2, 1.18.1, 1.20.0
> Environment: Local environment, Open Source Flink without 
> modifications, the cluster started by ./bin/start-cluster.sh
>Reporter: Andrey Gaskov
>Priority: Critical
>
> The file named /home/miron/tmp/data.csv contains a single line:
> {code:java}
> "1970-01-01 00:00:00Z" {code}
>  
> Run the following commands in Flink SQL client:
> {code:java}
> Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
> [INFO] Execute statement succeeded.
> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
> [INFO] Execute statement succeeded.
> Flink SQL> SET 'execution.runtime-mode' = 'batch';
> [INFO] Execute statement succeeded.
> Flink SQL> 
> > create table t_in (
> >   t timestamp_ltz
> > ) with (
> >   'connector' = 'filesystem',
> >   'path' = '/home/miron/tmp/data.csv',
> >   'format' = 'csv'
> > );
> [INFO] Execute statement succeeded.
> Flink SQL> select * from t_in;
> ++
> |                          t |
> ++
> | 1970-01-01 08:00:00.00 |
> ++
> 1 row in set (1.33 seconds)
> {code}
> So far so good. The behavior corresponds to the specification.
>  
> Run the following query:
> {code:java}
> Flink SQL> select TO_TIMESTAMP_LTZ(0, 0);
> +-+
> |                  EXPR$0 |
> +-+
> | 1970-01-01 08:00:00.000 |
> +-+
> 1 row in set (0.36 seconds)
> {code}
> This is also correct. Zero point on the timeline corresponds to 1970-01-01 
> 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai 
> time zone.
>  
> Now things get worse:
> {code:java}
> Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0);
> Empty set (0.47 seconds) {code}
> *{color:#de350b}This is wrong.{color}* We should get the record as a result.
>  
> We could fix it the following way:
> {code:java}
> Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0);
> ++
> |                          t |
> ++
> | 1970-01-01 08:00:00.00 |
> ++
> 1 row in set (0.37 seconds) {code}
> Even though we got the record, we should not specify 8*60*60 argument to 
> TO_TIMESTAMP_LTZ.
>  
> But the most ridiculous result is the following:
> {code:java}
> Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0);
> ++
> |                          t |
> ++
> | 1970-01-01 16:00:00.00 |
> ++
> 1 row in set (0.37 seconds) {code}
> *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison 
> function from "<=" to "=" in the where clause we got the wrong time ({*}16:00 
> instead of 08:00{*}).
>  
> The same behavior we get in Java. The result is an object of Instant class 
> with wrong value. Also, in Java I got more wrong cases that could not be 
> reproduced using SQL Client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33142) Prometheus Sink Connector - Update Documentation

2024-06-19 Thread Lorenzo Nicora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856331#comment-17856331
 ] 

Lorenzo Nicora commented on FLINK-33142:


[~Hong Teoh] can you please assign this to me. I will start working on docs

> Prometheus Sink Connector - Update Documentation
> 
>
> Key: FLINK-33142
> URL: https://issues.apache.org/jira/browse/FLINK-33142
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Lorenzo Nicora
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35651) Unable to call argument-less built-in functions with parentheses

2024-06-19 Thread yux (Jira)
yux created FLINK-35651:
---

 Summary: Unable to call argument-less built-in functions with 
parentheses
 Key: FLINK-35651
 URL: https://issues.apache.org/jira/browse/FLINK-35651
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, CDC Pipeline transform module handles some built-in functions 
specially, including LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME, CURRENT_DATE, and 
CURRENT_TIMESTAMP.

In SystemFunctionUtils implementation, they have arguments actually, but are 
silently bound to current _{_}epoch_time{_}_ and _{_}time_zone{_}_ variable. 
However, user could not call these functions with parentheses directly because 
function signature doesn't match.

For example,

```yaml

transform:

  - projection: CURRENT_TIMESTAMP AS TS

```

is OK since it was automatically expanded to something like 
`currentTimestamp(_{_}epoch_time{_}{_}, __time_zone{_}_)`.

But for this definition:

```yaml

transform:

  - projection: CURRENT_TIMESTAMP() AS TS

```

Transform will fail at runtime:

> org.apache.calcite.runtime.CalciteContextException: From line 1, column 12 to 
> line 1, column 30: No match found for function signature CURRENT_TIMESTAMP()

which could be quite confusing to users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35652) FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join

2024-06-19 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35652:
---
Description: Ticket for [FLIP-462 Support Custom Data Distribution for 
Input Stream of Lookup 
Join|https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join]

> FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join
> --
>
> Key: FLINK-35652
> URL: https://issues.apache.org/jira/browse/FLINK-35652
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Weijie Guo
>Priority: Major
>
> Ticket for [FLIP-462 Support Custom Data Distribution for Input Stream of 
> Lookup 
> Join|https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35652) FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join

2024-06-19 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35652:
--

 Summary: FLIP-462: Support Custom Data Distribution for Input 
Stream of Lookup Join
 Key: FLINK-35652
 URL: https://issues.apache.org/jira/browse/FLINK-35652
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35653) "fraud detection" example missed "env.execute" explanation

2024-06-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856372#comment-17856372
 ] 

Luke Chen commented on FLINK-35653:
---

Working on the fix.

> "fraud detection" example missed "env.execute" explanation
> --
>
> Key: FLINK-35653
> URL: https://issues.apache.org/jira/browse/FLINK-35653
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Luke Chen
>Priority: Major
>
> In "try flink" page, there is a [fraud detection with the dataStream 
> API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/]
>  example to demo how to build a stateful streaming application with Flink’s 
> DataStream API. In this page, we explained the code line by line, but missed 
> the last line:
> {code:java}
> env.execute("Fraud Detection");{code}
>  
> I confirmed this was existed before 
> [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365],
>  but when we migrated Flink docs from Jekyll to Hugo in this 
> [PR|https://github.com/apache/flink/pull/14903], we missed that section. This 
> miss will let first flink users miss the important line to execute the flink 
> job.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35653) "fraud detection" example missed "env.execute" explanation

2024-06-19 Thread Luke Chen (Jira)
Luke Chen created FLINK-35653:
-

 Summary: "fraud detection" example missed "env.execute" explanation
 Key: FLINK-35653
 URL: https://issues.apache.org/jira/browse/FLINK-35653
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Luke Chen


In "try flink" page, there is a [fraud detection with the dataStream 
API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/]
 example to demo how to build a stateful streaming application with Flink’s 
DataStream API. In this page, we explained the code line by line, but missed 
the last line:
{code:java}
env.execute("Fraud Detection");{code}
 

I confirmed this was existed before 
[v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365],
 but when we migrated Flink docs from Jekyll to Hugo in this 
[PR|https://github.com/apache/flink/pull/14903], we missed that section. This 
miss will let first flink users miss the important line to execute the flink 
job.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35653) "fraud detection" example missed "env.execute" explanation

2024-06-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated FLINK-35653:
--
Description: 
In "try flink" page, there is a [fraud detection with the dataStream 
API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/#breaking-down-the-code]
 example to demo how to build a stateful streaming application with Flink’s 
DataStream API. In this page, we explained the code line by line, but missed 
the last line:
{code:java}
env.execute("Fraud Detection");{code}
 

I confirmed this was existed before 
[v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365],
 but when we migrated Flink docs from Jekyll to Hugo in this 
[PR|https://github.com/apache/flink/pull/14903], we missed that section. This 
miss will let first flink users miss the important line to execute the flink 
job.

 

 

  was:
In "try flink" page, there is a [fraud detection with the dataStream 
API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/]
 example to demo how to build a stateful streaming application with Flink’s 
DataStream API. In this page, we explained the code line by line, but missed 
the last line:
{code:java}
env.execute("Fraud Detection");{code}
 

I confirmed this was existed before 
[v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365],
 but when we migrated Flink docs from Jekyll to Hugo in this 
[PR|https://github.com/apache/flink/pull/14903], we missed that section. This 
miss will let first flink users miss the important line to execute the flink 
job.

 

 


> "fraud detection" example missed "env.execute" explanation
> --
>
> Key: FLINK-35653
> URL: https://issues.apache.org/jira/browse/FLINK-35653
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Luke Chen
>Priority: Major
>
> In "try flink" page, there is a [fraud detection with the dataStream 
> API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/#breaking-down-the-code]
>  example to demo how to build a stateful streaming application with Flink’s 
> DataStream API. In this page, we explained the code line by line, but missed 
> the last line:
> {code:java}
> env.execute("Fraud Detection");{code}
>  
> I confirmed this was existed before 
> [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365],
>  but when we migrated Flink docs from Jekyll to Hugo in this 
> [PR|https://github.com/apache/flink/pull/14903], we missed that section. This 
> miss will let first flink users miss the important line to execute the flink 
> job.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33936) Outputting Identical Results in Mini-Batch Aggregation with Set TTL

2024-06-19 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856374#comment-17856374
 ] 

Weijie Guo edited comment on FLINK-33936 at 6/20/24 2:56 AM:
-

master(1.20) via d661ec2983ef249911698a1d7ebbb7ec58a9e62a.


was (Author: weijie guo):
master(1.20) via 
https://github.com/apache/flink/commit/d661ec2983ef249911698a1d7ebbb7ec58a9e62a.

> Outputting Identical Results in Mini-Batch Aggregation with Set TTL
> ---
>
> Key: FLINK-33936
> URL: https://issues.apache.org/jira/browse/FLINK-33936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Assignee: Feng Jin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> If mini-batch is enabled currently, and if the aggregated result is the same 
> as the previous output, this current aggregation result will not be sent 
> downstream.  This will cause downstream nodes to not receive updated data. If 
> there is a TTL set for states at this time, the TTL of downstream will not be 
> updated either.
> The specific logic is as follows.
> https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
> if (!equaliser.equals(prevAggValue, newAggValue)) {
> // new row is not same with prev row
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> out.collect(resultRow);
> }
> // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time 
> is the same as last time, new results will still be sent if TTL is set.
> https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
> if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
> newAggValue)) {
> // newRow is the same as before and state cleaning is not 
> enabled.
> // We do not emit retraction and acc message.
> // If state cleaning is enabled, we have to emit messages 
> to prevent too early
> // state eviction of downstream operators.
> return;
> } else {
> // retract previous result
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when 
> mini-batch aggregation is enabled, new results should also output when the 
> aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33936) Outputting Identical Results in Mini-Batch Aggregation with Set TTL

2024-06-19 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-33936.
--
Fix Version/s: 1.20.0
   Resolution: Fixed

master(1.20) via 
https://github.com/apache/flink/commit/d661ec2983ef249911698a1d7ebbb7ec58a9e62a.

> Outputting Identical Results in Mini-Batch Aggregation with Set TTL
> ---
>
> Key: FLINK-33936
> URL: https://issues.apache.org/jira/browse/FLINK-33936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Assignee: Feng Jin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> If mini-batch is enabled currently, and if the aggregated result is the same 
> as the previous output, this current aggregation result will not be sent 
> downstream.  This will cause downstream nodes to not receive updated data. If 
> there is a TTL set for states at this time, the TTL of downstream will not be 
> updated either.
> The specific logic is as follows.
> https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
> if (!equaliser.equals(prevAggValue, newAggValue)) {
> // new row is not same with prev row
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> out.collect(resultRow);
> }
> // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time 
> is the same as last time, new results will still be sent if TTL is set.
> https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
> if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
> newAggValue)) {
> // newRow is the same as before and state cleaning is not 
> enabled.
> // We do not emit retraction and acc message.
> // If state cleaning is enabled, we have to emit messages 
> to prevent too early
> // state eviction of downstream operators.
> return;
> } else {
> // retract previous result
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when 
> mini-batch aggregation is enabled, new results should also output when the 
> aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35653) "fraud detection" example missed "env.execute" explanation

2024-06-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856375#comment-17856375
 ] 

Luke Chen commented on FLINK-35653:
---

PR: https://github.com/apache/flink/pull/24959

> "fraud detection" example missed "env.execute" explanation
> --
>
> Key: FLINK-35653
> URL: https://issues.apache.org/jira/browse/FLINK-35653
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Luke Chen
>Priority: Major
>
> In "try flink" page, there is a [fraud detection with the dataStream 
> API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/#breaking-down-the-code]
>  example to demo how to build a stateful streaming application with Flink’s 
> DataStream API. In this page, we explained the code line by line, but missed 
> the last line:
> {code:java}
> env.execute("Fraud Detection");{code}
>  
> I confirmed this was existed before 
> [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365],
>  but when we migrated Flink docs from Jekyll to Hugo in this 
> [PR|https://github.com/apache/flink/pull/14903], we missed that section. This 
> miss will let first flink users miss the important line to execute the flink 
> job.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35654) Add Flink CDC verification guide in docs

2024-06-19 Thread yux (Jira)
yux created FLINK-35654:
---

 Summary: Add Flink CDC verification guide in docs
 Key: FLINK-35654
 URL: https://issues.apache.org/jira/browse/FLINK-35654
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, ASF voting process requires vast quality verification before 
releasing any new versions, including:
 * Tarball checksum verification
 * Compile from source code
 * Run pipeline E2e tests
 * Run migration tests
 * Check if jar was packaged with correct JDK version
 * ...

Adding verification guide in Flink CDC docs should help developers verify 
future releases more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35630) Kafka source may reset the consume offset to earliest when the partition leader changes

2024-06-19 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856378#comment-17856378
 ] 

tanjialiang commented on FLINK-35630:
-

[~martijnvisser] Your point makes a lot of sense.

The Kafka consumer will theoretically only consume data that is smaller than 
the high watermark, and the current problem I'm having is that after changing 
partition leaders, the consume offset goes out of range.

Currently our Kafka cluster version is 2.3.0, and after much troubleshooting to 
confirm, we think we are experiencing this problem 
https://issues.apache.org/jira/browse/KAFKA-9835.

> Kafka source may reset the consume offset to earliest when the partition 
> leader changes
> ---
>
> Key: FLINK-35630
> URL: https://issues.apache.org/jira/browse/FLINK-35630
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.2.0
>Reporter: tanjialiang
>Priority: Major
>
> Kafka producer using the ack=1 option to write data to a topic.
> Flink Kafka source startup with the *scan.startup.mode=earliest-offset* 
> option to consume (the Kafka *auto.offset.reset* option will be force 
> override to earliest).
> If a partition leader is not available, a follower may become the new leader 
> and this may trigger log truncation. It may cause consumers to consume offset 
> out of range and use the *auto.offset.reset* strategy to reset the offset.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35655) Support more pipeline sources

2024-06-19 Thread linqigeng (Jira)
linqigeng created FLINK-35655:
-

 Summary: Support more pipeline sources
 Key: FLINK-35655
 URL: https://issues.apache.org/jira/browse/FLINK-35655
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: linqigeng


It's only support mysql pipeline source currently, expecte more implements of 
source, like mongodb, oracle, pg...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35642) parsing the configuration file in flink, when configuring the s3 key, if there is contain # char, the correct key will be split

2024-06-19 Thread blackpighe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

blackpighe updated FLINK-35642:
---
Fix Version/s: 1.18.2
   (was: 1.19.1)

> parsing the configuration file in flink, when configuring the s3 key, if 
> there is contain # char, the correct key will be split
> ---
>
> Key: FLINK-35642
> URL: https://issues.apache.org/jira/browse/FLINK-35642
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.3, 1.19.0, 1.20.0, 1.19.1
>Reporter: blackpighe
>Priority: Major
> Fix For: 1.18.2
>
> Attachments: image-2024-06-19-11-45-05-871.png
>
>
> parsing the configuration file in flink, when configuring the s3 key, if 
> there is contain # char, the correct key will be split。
>  
> such as:
>  
> s3.secret-key: Minio#dct@78
>  
>  
> !image-2024-06-19-11-45-05-871.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35642) parsing the configuration file in flink, when configuring the s3 key, if there is contain # char, the correct key will be split

2024-06-19 Thread blackpighe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856397#comment-17856397
 ] 

blackpighe commented on FLINK-35642:


[~JunRuiLi] This feature was only supported in 1.19, I'm using a lower 
version.I want to solve this problem on 1.18. Can we discuss it.

> parsing the configuration file in flink, when configuring the s3 key, if 
> there is contain # char, the correct key will be split
> ---
>
> Key: FLINK-35642
> URL: https://issues.apache.org/jira/browse/FLINK-35642
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.3, 1.19.0, 1.20.0, 1.19.1
>Reporter: blackpighe
>Priority: Major
> Fix For: 1.18.2
>
> Attachments: image-2024-06-19-11-45-05-871.png
>
>
> parsing the configuration file in flink, when configuring the s3 key, if 
> there is contain # char, the correct key will be split。
>  
> such as:
>  
> s3.secret-key: Minio#dct@78
>  
>  
> !image-2024-06-19-11-45-05-871.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35642) parsing the configuration file in flink, when configuring the s3 key, if there is contain # char, the correct key will be split

2024-06-19 Thread blackpighe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856398#comment-17856398
 ] 

blackpighe commented on FLINK-35642:


The main reason is that the problem is hidden deeply and it is difficult to 
locate the problem

> parsing the configuration file in flink, when configuring the s3 key, if 
> there is contain # char, the correct key will be split
> ---
>
> Key: FLINK-35642
> URL: https://issues.apache.org/jira/browse/FLINK-35642
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.3, 1.19.0, 1.20.0, 1.19.1
>Reporter: blackpighe
>Priority: Major
> Fix For: 1.18.2
>
> Attachments: image-2024-06-19-11-45-05-871.png
>
>
> parsing the configuration file in flink, when configuring the s3 key, if 
> there is contain # char, the correct key will be split。
>  
> such as:
>  
> s3.secret-key: Minio#dct@78
>  
>  
> !image-2024-06-19-11-45-05-871.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)