[jira] [Commented] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect
[ https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856151#comment-17856151 ] Martijn Visser commented on FLINK-35631: [~renqs] Given that you worked on the FLIP, can you give your thoughts on this one? > KafkaSource parameter partition.discovery.interval.ms with a default value of > 5 minutes does not take effect > > > Key: FLINK-35631 > URL: https://issues.apache.org/jira/browse/FLINK-35631 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: elon_X >Priority: Major > > When I start a stream program to consume Kafka > (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically > detect new partitions after Kafka adds partitions. > > *Reason* > In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has > been overridden. Since I did not set this parameter, even though it is > {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets > {{{}partition.discovery.interval.ms = -1{}}}. > In the {{{}KafkaSourceEnumerator{}}}, the value of > {{partition.discovery.interval.ms}} is then -1, instead of the default value > of 5 minutes, so automatic partition discovery does not work, and the default > value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless. > > A possible solution is to set {{partition.discovery.interval.ms = -1}} only > if {{boundedness == Boundedness.BOUNDED}} is true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35629) Performance regression in stringRead and stringWrite
[ https://issues.apache.org/jira/browse/FLINK-35629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856172#comment-17856172 ] Piotr Nowojski commented on FLINK-35629: It's strange, why JMH upgrade is causing this? Isn't this yet another case that JIT is going crazy due to some unrelated change? Anyway +1 for just accepting it. > Performance regression in stringRead and stringWrite > > > Key: FLINK-35629 > URL: https://issues.apache.org/jira/browse/FLINK-35629 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Fix For: 1.20.0 > > Attachments: image-2024-06-18-14-52-55-164.png > > > [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.ascii&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.russian&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.ascii&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.chinese&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.russian&extr=on&quarts=on&equid=off&env=3&revs=200] > !image-2024-06-18-14-52-55-164.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35619) Window rank query fails with "must call validate first"
[ https://issues.apache.org/jira/browse/FLINK-35619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-35619. Resolution: Fixed Fixed in e91d4a16305ddf7d4e34f45c9eec9c0af454a1d4 > Window rank query fails with "must call validate first" > --- > > Key: FLINK-35619 > URL: https://issues.apache.org/jira/browse/FLINK-35619 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > A program: > {code} > static final TableTestProgram WINDOW_RANK_HOP_TVF_NAMED_MIN_TOP_1 = > TableTestProgram.of( > "window-rank-hop-tvf-named-min-top-n", > "validates window min top-n follows after hop > window") > .setupTableSource(SourceTestStep.newBuilder("bid_t") > .addSchema( > "ts STRING", > "price DECIMAL(10,2)", > "supplier_id STRING", > "`bid_time` AS TO_TIMESTAMP(`ts`)", > "WATERMARK for `bid_time` AS `bid_time` - > INTERVAL '1' SECOND") > .producedValues( > Row.of( > "2020-04-15 08:00:05", > new BigDecimal(4.00), > "supplier1")) > .build()) > .setupTableSink( > SinkTestStep.newBuilder("sink_t") > .addSchema("bid_time TIMESTAMP(3)", > "supplier_id STRING") > .consumedValues( > "+I[2020-04-15T08:00:05, > supplier1]", > "+I[2020-04-15T08:00:05, > supplier1]") > .build()) > .runSql("INSERT INTO sink_t(bid_time, supplier_id) " > + "SELECT bid_time, supplier_id\n" > + " FROM (\n" > + "SELECT\n" > + " bid_time,\n" > + " supplier_id,\n" > + " ROW_NUMBER() OVER (PARTITION BY > window_start, window_end ORDER BY price ASC) AS row_num\n" > + "FROM TABLE(HOP(\n" > + " DATA => TABLE bid_t,\n" > + " TIMECOL => DESCRIPTOR(`bid_time`),\n" > + " SLIDE => INTERVAL '5' SECOND,\n" > + " SIZE => INTERVAL '10' SECOND))\n" > + " ) WHERE row_num <= 3") > .build(); > {code} > fails with: > {code} > java.lang.AssertionError: must call validate first > at > org.apache.calcite.sql.validate.IdentifierNamespace.resolve(IdentifierNamespace.java:256) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2871) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2464) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2378) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2323) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:730) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:716) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3880) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryOrInList(SqlToRelConverter.java:1912) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertExists(SqlToRelConverter.java:1895) > at > org.apache.calcite.sql2rel.SqlToRelConverter.substituteSubQuery(SqlToRelConverter.java:1421) > at > org.apache.calcite.sql2rel.SqlToRelConverter.replaceSubQueries(SqlToRelConverter.java:1161) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertCollectionTable(SqlToRelConverter.java:2928) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2511) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2378) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConve
[jira] [Assigned] (FLINK-35648) Pipeline job doesn't support multiple routing
[ https://issues.apache.org/jira/browse/FLINK-35648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruan Hang reassigned FLINK-35648: - Assignee: yux > Pipeline job doesn't support multiple routing > - > > Key: FLINK-35648 > URL: https://issues.apache.org/jira/browse/FLINK-35648 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Major > > Currently, any upstream could be routed at most once, which means if we wrote > such route definition: > routes: > - source-table: db.(A|B) > sink-table: terminal.one > - source0table: db.(B|C) > sink-table: terminal.two > Any upstream schema / data changes from db.B will be sent to terminal.one > {*}only{*}, not to terminal.two since it has been handled by the first route > rule. > This ticket suggests adding a route behavior option (FIRST_MATCH / COMPLETE) > to configure if all route rules should be applied or only the first matched > rule (for backwards compatibility.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35648) Pipeline job doesn't support multiple routing
[ https://issues.apache.org/jira/browse/FLINK-35648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856216#comment-17856216 ] Ruan Hang commented on FLINK-35648: --- [~xiqian_yu] This is an important improvement. Thanks for working on it. Assign it to you. > Pipeline job doesn't support multiple routing > - > > Key: FLINK-35648 > URL: https://issues.apache.org/jira/browse/FLINK-35648 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Major > > Currently, any upstream could be routed at most once, which means if we wrote > such route definition: > routes: > - source-table: db.(A|B) > sink-table: terminal.one > - source0table: db.(B|C) > sink-table: terminal.two > Any upstream schema / data changes from db.B will be sent to terminal.one > {*}only{*}, not to terminal.two since it has been handled by the first route > rule. > This ticket suggests adding a route behavior option (FIRST_MATCH / COMPLETE) > to configure if all route rules should be applied or only the first matched > rule (for backwards compatibility.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35636) Streaming File Sink s3 end-to-end test did not finish after 900 seconds
[ https://issues.apache.org/jira/browse/FLINK-35636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856222#comment-17856222 ] Weijie Guo commented on FLINK-35636: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60360&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=9598 > Streaming File Sink s3 end-to-end test did not finish after 900 seconds > --- > > Key: FLINK-35636 > URL: https://issues.apache.org/jira/browse/FLINK-35636 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.17.2 >Reporter: Weijie Guo >Priority: Major > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60335&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=4916 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35649) Bump Flink version to 1.19.1 in k8s-operator
Ferenc Csaky created FLINK-35649: Summary: Bump Flink version to 1.19.1 in k8s-operator Key: FLINK-35649 URL: https://issues.apache.org/jira/browse/FLINK-35649 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Ferenc Csaky Fix For: kubernetes-operator-1.10.0 In FLINK-28915 it came up the the operator is not able to utilize the artifact fetching capabilities that was introduced in Flink 1.19 until it is not built on that version. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager
[ https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856230#comment-17856230 ] Matthias Pohl commented on FLINK-35639: --- [~chesnay] pointed me to the actual issue because I was wondering why the change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The problem is that you're actually not following the supported process (as documented in the [Flink docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]). That results in incompatibilities of internal APIs (the constructor in question is package-private). Please use savepoints to migrate jobs. There are other internal APIs (the JobGraph itself isn't a stable API, either) that might cause problems in your upgrade process. # Create a savepoint of the job in the old version. # Start the Flink cluster with the upgraded Flink version. # Submit the job using the created savepoint to restart the job. > upgrade to 1.19 with job in HA state with restart strategy crashes job manager > -- > > Key: FLINK-35639 > URL: https://issues.apache.org/jira/browse/FLINK-35639 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.20.0, 1.19.1 > Environment: Download 1.18 and 1.19 binary releases. Add the > following to flink-1.19.0/conf/config.yaml and > flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper > high-availability.zookeeper.quorum: localhost high-availability.storageDir: > file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host > zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh > start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh > start-foreground launch the following job: ```java import > org.apache.flink.api.java.ExecutionEnvironment; import > org.apache.flink.api.java.tuple.Tuple2; import > org.apache.flink.api.common.functions.FlatMapFunction; import > org.apache.flink.util.Collector; import > org.apache.flink.api.common.restartstrategy.RestartStrategies; import > org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; > public class FlinkJob \{ public static void main(String[] args) throws > Exception { final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( > RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, > TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") > .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static > final class LineSplitter implements FlatMapFunction> \{ @Override public void > flatMap(String value, Collector> out) { for (String word : value.split(" ")) > { try { Thread.sleep(12); } catch (InterruptedException e) \{ > e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml > 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink > flink-java ${flink.version} org.apache.flink flink-streaming-java > ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 > ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin > 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run > ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with > JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. > Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh > start-foreground Root cause == It looks like the type of > delayBetweenAttemptsInterval was changed in 1.19 > https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 > , introducing an incompatibility which is not handled by flink 1.19. In my > opinion, job-maanger should not crash when starting in that case. >Reporter: yazgoo >Assignee: Matthias Pohl >Priority: Blocker > Labels: pull-request-available > > When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in > zookeeper HA state, I have a jobmanager crash with a ClassCastException, see > log below > > {code:java} > 2024-06-18 16:58:14,401 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: > JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed. at > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775) > ~[flink-dist-1.19.0.jar:1.19.0] at
[jira] [Comment Edited] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager
[ https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856230#comment-17856230 ] Matthias Pohl edited comment on FLINK-35639 at 6/19/24 9:47 AM: [~chesnay] pointed me to the actual issue because I was wondering why the change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The problem is that you're actually not following the supported process (as documented in the [Flink docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]). That results in incompatibilities of internal APIs (the constructor in question is package-private). Please use savepoints to migrate jobs. There are other internal APIs (the JobGraph itself isn't a stable API, either) that might cause problems in your upgrade process. # Create a savepoint of the job in the old version. # Start the Flink cluster with the upgraded Flink version. # Submit the job using the created savepoint to restart the job using the job client of the new Flink binaries (to allow for proper JobGraph creation). was (Author: mapohl): [~chesnay] pointed me to the actual issue because I was wondering why the change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The problem is that you're actually not following the supported process (as documented in the [Flink docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]). That results in incompatibilities of internal APIs (the constructor in question is package-private). Please use savepoints to migrate jobs. There are other internal APIs (the JobGraph itself isn't a stable API, either) that might cause problems in your upgrade process. # Create a savepoint of the job in the old version. # Start the Flink cluster with the upgraded Flink version. # Submit the job using the created savepoint to restart the job. > upgrade to 1.19 with job in HA state with restart strategy crashes job manager > -- > > Key: FLINK-35639 > URL: https://issues.apache.org/jira/browse/FLINK-35639 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.20.0, 1.19.1 > Environment: Download 1.18 and 1.19 binary releases. Add the > following to flink-1.19.0/conf/config.yaml and > flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper > high-availability.zookeeper.quorum: localhost high-availability.storageDir: > file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host > zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh > start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh > start-foreground launch the following job: ```java import > org.apache.flink.api.java.ExecutionEnvironment; import > org.apache.flink.api.java.tuple.Tuple2; import > org.apache.flink.api.common.functions.FlatMapFunction; import > org.apache.flink.util.Collector; import > org.apache.flink.api.common.restartstrategy.RestartStrategies; import > org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; > public class FlinkJob \{ public static void main(String[] args) throws > Exception { final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( > RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, > TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") > .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static > final class LineSplitter implements FlatMapFunction> \{ @Override public void > flatMap(String value, Collector> out) { for (String word : value.split(" ")) > { try { Thread.sleep(12); } catch (InterruptedException e) \{ > e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml > 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink > flink-java ${flink.version} org.apache.flink flink-streaming-java > ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 > ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin > 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run > ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with > JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. > Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh > start-foreground Root cause == It looks like the type of > delayBetweenAttemptsInterval was changed in 1.19 > https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 > , introducing an incompat
[jira] [Closed] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager
[ https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl closed FLINK-35639. - Resolution: Not A Problem I'm closing the issue and the related PRs because we're actually not supporting this kind of version upgrades in general. Fixing the {{RestartStrategy}} issue wouldn't necessarily solve the issue. > upgrade to 1.19 with job in HA state with restart strategy crashes job manager > -- > > Key: FLINK-35639 > URL: https://issues.apache.org/jira/browse/FLINK-35639 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.20.0, 1.19.1 > Environment: Download 1.18 and 1.19 binary releases. Add the > following to flink-1.19.0/conf/config.yaml and > flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper > high-availability.zookeeper.quorum: localhost high-availability.storageDir: > file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host > zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh > start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh > start-foreground launch the following job: ```java import > org.apache.flink.api.java.ExecutionEnvironment; import > org.apache.flink.api.java.tuple.Tuple2; import > org.apache.flink.api.common.functions.FlatMapFunction; import > org.apache.flink.util.Collector; import > org.apache.flink.api.common.restartstrategy.RestartStrategies; import > org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; > public class FlinkJob \{ public static void main(String[] args) throws > Exception { final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( > RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, > TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") > .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static > final class LineSplitter implements FlatMapFunction> \{ @Override public void > flatMap(String value, Collector> out) { for (String word : value.split(" ")) > { try { Thread.sleep(12); } catch (InterruptedException e) \{ > e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml > 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink > flink-java ${flink.version} org.apache.flink flink-streaming-java > ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 > ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin > 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run > ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with > JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. > Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh > start-foreground Root cause == It looks like the type of > delayBetweenAttemptsInterval was changed in 1.19 > https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 > , introducing an incompatibility which is not handled by flink 1.19. In my > opinion, job-maanger should not crash when starting in that case. >Reporter: yazgoo >Assignee: Matthias Pohl >Priority: Blocker > Labels: pull-request-available > > When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in > zookeeper HA state, I have a jobmanager crash with a ClassCastException, see > log below > > {code:java} > 2024-06-18 16:58:14,401 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: > JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed. at > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693) > ~[flink-dist-1.19.0.jar:1.19.0] at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) > ~[?:?] at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) > ~[?:?] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(P
[jira] [Updated] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager
[ https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-35639: -- Priority: Major (was: Blocker) > upgrade to 1.19 with job in HA state with restart strategy crashes job manager > -- > > Key: FLINK-35639 > URL: https://issues.apache.org/jira/browse/FLINK-35639 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.20.0, 1.19.1 > Environment: Download 1.18 and 1.19 binary releases. Add the > following to flink-1.19.0/conf/config.yaml and > flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper > high-availability.zookeeper.quorum: localhost high-availability.storageDir: > file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host > zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh > start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh > start-foreground launch the following job: ```java import > org.apache.flink.api.java.ExecutionEnvironment; import > org.apache.flink.api.java.tuple.Tuple2; import > org.apache.flink.api.common.functions.FlatMapFunction; import > org.apache.flink.util.Collector; import > org.apache.flink.api.common.restartstrategy.RestartStrategies; import > org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; > public class FlinkJob \{ public static void main(String[] args) throws > Exception { final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( > RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, > TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") > .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static > final class LineSplitter implements FlatMapFunction> \{ @Override public void > flatMap(String value, Collector> out) { for (String word : value.split(" ")) > { try { Thread.sleep(12); } catch (InterruptedException e) \{ > e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml > 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink > flink-java ${flink.version} org.apache.flink flink-streaming-java > ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 > ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin > 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run > ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with > JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. > Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh > start-foreground Root cause == It looks like the type of > delayBetweenAttemptsInterval was changed in 1.19 > https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 > , introducing an incompatibility which is not handled by flink 1.19. In my > opinion, job-maanger should not crash when starting in that case. >Reporter: yazgoo >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > > When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in > zookeeper HA state, I have a jobmanager crash with a ClassCastException, see > log below > > {code:java} > 2024-06-18 16:58:14,401 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: > JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed. at > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693) > ~[flink-dist-1.19.0.jar:1.19.0] at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) > ~[?:?] at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) > ~[?:?] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) > ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingU
[jira] [Comment Edited] (FLINK-35639) upgrade to 1.19 with job in HA state with restart strategy crashes job manager
[ https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856230#comment-17856230 ] Matthias Pohl edited comment on FLINK-35639 at 6/19/24 9:53 AM: [~chesnay] pointed me to the actual issue. I was initially wondering why the change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The problem is that you're actually not following the supported process (as documented in the [Flink docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]). That results in incompatibilities of internal APIs (the constructor in question is package-private). Please use savepoints to migrate jobs. There are other internal APIs (the JobGraph itself isn't a stable API, either) that might cause problems in your upgrade process. # Create a savepoint of the job in the old version. # Start the Flink cluster with the upgraded Flink version. # Submit the job using the created savepoint to restart the job using the job client of the new Flink binaries (to allow for proper JobGraph creation). was (Author: mapohl): [~chesnay] pointed me to the actual issue because I was wondering why the change in FLINK-32570 was actually "overlooked" by our {{japicmp}} checks. The problem is that you're actually not following the supported process (as documented in the [Flink docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#restarting-streaming-applications]). That results in incompatibilities of internal APIs (the constructor in question is package-private). Please use savepoints to migrate jobs. There are other internal APIs (the JobGraph itself isn't a stable API, either) that might cause problems in your upgrade process. # Create a savepoint of the job in the old version. # Start the Flink cluster with the upgraded Flink version. # Submit the job using the created savepoint to restart the job using the job client of the new Flink binaries (to allow for proper JobGraph creation). > upgrade to 1.19 with job in HA state with restart strategy crashes job manager > -- > > Key: FLINK-35639 > URL: https://issues.apache.org/jira/browse/FLINK-35639 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.20.0, 1.19.1 > Environment: Download 1.18 and 1.19 binary releases. Add the > following to flink-1.19.0/conf/config.yaml and > flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper > high-availability.zookeeper.quorum: localhost high-availability.storageDir: > file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host > zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh > start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh > start-foreground launch the following job: ```java import > org.apache.flink.api.java.ExecutionEnvironment; import > org.apache.flink.api.java.tuple.Tuple2; import > org.apache.flink.api.common.functions.FlatMapFunction; import > org.apache.flink.util.Collector; import > org.apache.flink.api.common.restartstrategy.RestartStrategies; import > org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; > public class FlinkJob \{ public static void main(String[] args) throws > Exception { final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( > RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, > TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") > .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static > final class LineSplitter implements FlatMapFunction> \{ @Override public void > flatMap(String value, Collector> out) { for (String word : value.split(" ")) > { try { Thread.sleep(12); } catch (InterruptedException e) \{ > e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml > 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink > flink-java ${flink.version} org.apache.flink flink-streaming-java > ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 > ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin > 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run > ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with > JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. > Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh > start-foreground Root cause == It looks like the type of > delayBetweenAttemptsInterval was changed in 1.19 > https://github.com/apache/flink/pull/22984/files#diff-d174f3
[jira] [Closed] (FLINK-35629) Performance regression in stringRead and stringWrite
[ https://issues.apache.org/jira/browse/FLINK-35629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan closed FLINK-35629. --- Resolution: Won't Fix > Performance regression in stringRead and stringWrite > > > Key: FLINK-35629 > URL: https://issues.apache.org/jira/browse/FLINK-35629 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Fix For: 1.20.0 > > Attachments: image-2024-06-18-14-52-55-164.png > > > [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.ascii&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.russian&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.ascii&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.chinese&extr=on&quarts=on&equid=off&env=3&revs=200] > [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.russian&extr=on&quarts=on&equid=off&env=3&revs=200] > !image-2024-06-18-14-52-55-164.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-16784) Support KeyedBroadcastProcessFunction state bootstrapping.
[ https://issues.apache.org/jira/browse/FLINK-16784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov reassigned FLINK-16784: - Assignee: Or Keren > Support KeyedBroadcastProcessFunction state bootstrapping. > --- > > Key: FLINK-16784 > URL: https://issues.apache.org/jira/browse/FLINK-16784 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Reporter: Seth Wiesman >Assignee: Or Keren >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-25646) Document buffer debloating issues with high parallelism
[ https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856252#comment-17856252 ] Jufang He edited comment on FLINK-25646 at 6/19/24 11:55 AM: - [~pnowojski] I would like to ask if there is any progress on this issue? I also saw significant performance degradation when testing buffer debloating (with Unaligned Checkpoint enabled). The following is some information about my test jobs. The test kafka source QPS is high, test jobs are all under backpressure and have lag. With buffer debloating enabled, the buffer size is smaller, the number of segments used is increased, and the total amount of in-flight data is significantly decreased, but the negative effect is also obvious, the throughput of the task is decreased by more than 30%. || ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*|| |parallelism|1350|1350| |checkpoint duration(avg)|2m 43s|2m 31s| |channel state size(avg)|65.9 GB|5.43 GB| |memory segment size|32K(default value)|256~5k (calculated by buffer debloating, most of them are 256)| |total memory segments per TM|115k|115k| |available memory segments per TM|63.2k|19.5k| |floating buffers per gate|8(default value)|2000| |*throughput(avg)*|374k|249k| was (Author: JIRAUSER302059): [~pnowojski] I would like to ask if there is any progress on this issue? I also saw significant performance degradation when testing buffer debloating (with Unaligned Checkpoint enabled). The following is some information about my test jobs. The test kafka source QPS is high, test jobs are all under backpressure and have lag. With buffer debloating enabled, the buffer size is smaller, the number of segments used is increased, and the total amount of in-flight data is significantly decreased, but the negative effect is also obvious, the throughput of the task is decreased by more than 30%. || ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*|| |parallelism|1350|1350| |checkpoint duration(avg)|2m 43s|2m 31s| |channel state size(avg)|65.9 GB|5.43 GB| |memory segment size|32K(default value)|256~5k (calculated by buffer debloating, most of them are 256)| |total memory segments per TM|115k|115k| |available memory segments per TM|63.2k|19.5k| |floating buffers per gate|8(default value)|2000| |*throughput(avg)*|374k|249k| > Document buffer debloating issues with high parallelism > --- > > Key: FLINK-25646 > URL: https://issues.apache.org/jira/browse/FLINK-25646 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / Network >Affects Versions: 1.14.0 >Reporter: Anton Kalashnikov >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > According to last benchmarks, there are some problems with buffer debloat > when job has high parallelism. The high parallelism means the different value > from job to job but in general it is more than 200. So it makes sense to > document that problem and propose the solution - increasing the number of > buffers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25646) Document buffer debloating issues with high parallelism
[ https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856252#comment-17856252 ] Jufang He commented on FLINK-25646: --- [~pnowojski] I would like to ask if there is any progress on this issue? I also saw significant performance degradation when testing buffer debloating (with Unaligned Checkpoint enabled). The following is some information about my test jobs. The test kafka source QPS is high, test jobs are all under backpressure and have lag. With buffer debloating enabled, the buffer size is smaller, the number of segments used is increased, and the total amount of in-flight data is significantly decreased, but the negative effect is also obvious, the throughput of the task is decreased by more than 30%. || ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*|| |parallelism|1350|1350| |checkpoint duration(avg)|2m 43s|2m 31s| |channel state size(avg)|65.9 GB|5.43 GB| |memory segment size|32K(default value)|256~5k (calculated by buffer debloating, most of them are 256)| |total memory segments per TM|115k|115k| |available memory segments per TM|63.2k|19.5k| |floating buffers per gate|8(default value)|2000| |*throughput(avg)*|374k|249k| > Document buffer debloating issues with high parallelism > --- > > Key: FLINK-25646 > URL: https://issues.apache.org/jira/browse/FLINK-25646 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / Network >Affects Versions: 1.14.0 >Reporter: Anton Kalashnikov >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > According to last benchmarks, there are some problems with buffer debloat > when job has high parallelism. The high parallelism means the different value > from job to job but in general it is more than 200. So it makes sense to > document that problem and propose the solution - increasing the number of > buffers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-25646) Document buffer debloating issues with high parallelism
[ https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856252#comment-17856252 ] Jufang He edited comment on FLINK-25646 at 6/19/24 11:56 AM: - [~pnowojski] I would like to ask if there is any progress on this issue(FLINK-25688)? I also saw significant performance degradation when testing buffer debloating (with Unaligned Checkpoint enabled). The following is some information about my test jobs. The test kafka source QPS is high, test jobs are all under backpressure and have lag. With buffer debloating enabled, the buffer size is smaller, the number of segments used is increased, and the total amount of in-flight data is significantly decreased, but the negative effect is also obvious, the throughput of the task is decreased by more than 30%. || ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*|| |parallelism|1350|1350| |checkpoint duration(avg)|2m 43s|2m 31s| |channel state size(avg)|65.9 GB|5.43 GB| |memory segment size|32K(default value)|256~5k (calculated by buffer debloating, most of them are 256)| |total memory segments per TM|115k|115k| |available memory segments per TM|63.2k|19.5k| |floating buffers per gate|8(default value)|2000| |*throughput(avg)*|374k|249k| was (Author: JIRAUSER302059): [~pnowojski] I would like to ask if there is any progress on this issue? I also saw significant performance degradation when testing buffer debloating (with Unaligned Checkpoint enabled). The following is some information about my test jobs. The test kafka source QPS is high, test jobs are all under backpressure and have lag. With buffer debloating enabled, the buffer size is smaller, the number of segments used is increased, and the total amount of in-flight data is significantly decreased, but the negative effect is also obvious, the throughput of the task is decreased by more than 30%. || ||*Buffer Debloating Disabled*||*Buffer Debloating Enabled*|| |parallelism|1350|1350| |checkpoint duration(avg)|2m 43s|2m 31s| |channel state size(avg)|65.9 GB|5.43 GB| |memory segment size|32K(default value)|256~5k (calculated by buffer debloating, most of them are 256)| |total memory segments per TM|115k|115k| |available memory segments per TM|63.2k|19.5k| |floating buffers per gate|8(default value)|2000| |*throughput(avg)*|374k|249k| > Document buffer debloating issues with high parallelism > --- > > Key: FLINK-25646 > URL: https://issues.apache.org/jira/browse/FLINK-25646 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / Network >Affects Versions: 1.14.0 >Reporter: Anton Kalashnikov >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > According to last benchmarks, there are some problems with buffer debloat > when job has high parallelism. The high parallelism means the different value > from job to job but in general it is more than 200. So it makes sense to > document that problem and propose the solution - increasing the number of > buffers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25646) Document buffer debloating issues with high parallelism
[ https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856273#comment-17856273 ] Piotr Nowojski commented on FLINK-25646: Hi [~hejufang001]. No, there have been no progress on this issue. > floating buffers per gate 8(default value)2000 does it mean for buffer debloating enabled, you have increased floating buffers from 8 to 2000? Also, have you tried setting higher debloating target? AFAIR we have observed that increasing the target from 1s (default) to 5s seems to behave much better where it comes to the max throughput. > Document buffer debloating issues with high parallelism > --- > > Key: FLINK-25646 > URL: https://issues.apache.org/jira/browse/FLINK-25646 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / Network >Affects Versions: 1.14.0 >Reporter: Anton Kalashnikov >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > According to last benchmarks, there are some problems with buffer debloat > when job has high parallelism. The high parallelism means the different value > from job to job but in general it is more than 200. So it makes sense to > document that problem and propose the solution - increasing the number of > buffers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35553) Integrate newly added trigger interface with checkpointing
[ https://issues.apache.org/jira/browse/FLINK-35553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-35553: - Assignee: Matthias Pohl > Integrate newly added trigger interface with checkpointing > -- > > Key: FLINK-35553 > URL: https://issues.apache.org/jira/browse/FLINK-35553 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Coordination >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > > This connects the newly introduced trigger logic (FLINK-35551) with the > {{CheckpointStatsTracker}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35552) Move CheckpointStatsTracker out of ExecutionGraph into Scheduler
[ https://issues.apache.org/jira/browse/FLINK-35552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-35552: - Assignee: Matthias Pohl > Move CheckpointStatsTracker out of ExecutionGraph into Scheduler > > > Key: FLINK-35552 > URL: https://issues.apache.org/jira/browse/FLINK-35552 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Coordination >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > > The scheduler needs to know about the CheckpointStatsTracker to allow > listening to checkpoint failures and completion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35551) Introduces RescaleManager#onTrigger endpoint
[ https://issues.apache.org/jira/browse/FLINK-35551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-35551: - Assignee: Matthias Pohl > Introduces RescaleManager#onTrigger endpoint > > > Key: FLINK-35551 > URL: https://issues.apache.org/jira/browse/FLINK-35551 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > > The new endpoint would allow use from separating observing change events from > actually triggering the rescale operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka
[ https://issues.apache.org/jira/browse/FLINK-25538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856294#comment-17856294 ] Muhammet Orazov commented on FLINK-25538: - Hey all, I have create new PR [https://github.com/apache/flink-connector-kafka/pull/106] (continuing from PR [https://github.com/apache/flink-connector-kafka/pull/66).] Please feel free to close the PR #66. I'll continue applying changes on #106. > [JUnit5 Migration] Module: flink-connector-kafka > > > Key: FLINK-25538 > URL: https://issues.apache.org/jira/browse/FLINK-25538 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Assignee: Muhammet Orazov >Priority: Minor > Labels: pull-request-available, stale-assigned, starter > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
Andrey Gaskov created FLINK-35650: - Summary: Incorrect TIMESTAMP_LTZ type behavior in Table SQL Key: FLINK-35650 URL: https://issues.apache.org/jira/browse/FLINK-35650 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Client, Table SQL / Runtime Affects Versions: 1.18.1, 1.17.2, 1.20.0 Environment: Local environment, Open Source Flink without modifications, the cluster started by ./bin/start-cluster.sh Reporter: Andrey Gaskov The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got wrong time. The same behavior we get in Java. The result is an object of Instance class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Gaskov updated FLINK-35650: -- Description: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got wrong time. The same behavior we get in Java. The result is an object of Instance class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. was: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); +-
[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Gaskov updated FLINK-35650: -- Description: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got wrong time. The same behavior we get in Java. The result is an object of Instant class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. was: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); +
[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Gaskov updated FLINK-35650: -- Description: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got wrong time. The same behavior we get in Java. The result is an object of Instant class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. was: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); +
[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Gaskov updated FLINK-35650: -- Description: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds to 1970-01-01 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got wrong time. The same behavior we get in Java. The result is an object of Instant class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. was: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds 1970-01-01 00:00:00 at zero UTC offset which corresponds to 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ |
[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Gaskov updated FLINK-35650: -- Description: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds to 1970-01-01 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 argument to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got wrong time. The same behavior we get in Java. The result is an object of Instant class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. was: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds to 1970-01-01 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 parameter to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ |
[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Gaskov updated FLINK-35650: -- Description: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds to 1970-01-01 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 argument to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got the wrong time. The same behavior we get in Java. The result is an object of Instant class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. was: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds to 1970-01-01 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 argument to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ |
[jira] [Updated] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Gaskov updated FLINK-35650: -- Description: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds to 1970-01-01 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 argument to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 16:00:00.00 | ++ 1 row in set (0.37 seconds) {code} *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison function from "<=" to "=" in the where clause we got the wrong time ({*}16:00 instead of 08:00{*}). The same behavior we get in Java. The result is an object of Instant class with wrong value. Also, in Java I got more wrong cases that could not be reproduced using SQL Client. was: The file named /home/miron/tmp/data.csv contains a single line: {code:java} "1970-01-01 00:00:00Z" {code} Run the following commands in Flink SQL client: {code:java} Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeeded. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; [INFO] Execute statement succeeded. Flink SQL> SET 'execution.runtime-mode' = 'batch'; [INFO] Execute statement succeeded. Flink SQL> > create table t_in ( > t timestamp_ltz > ) with ( > 'connector' = 'filesystem', > 'path' = '/home/miron/tmp/data.csv', > 'format' = 'csv' > ); [INFO] Execute statement succeeded. Flink SQL> select * from t_in; ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (1.33 seconds) {code} So far so good. The behavior corresponds to the specification. Run the following query: {code:java} Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); +-+ | EXPR$0 | +-+ | 1970-01-01 08:00:00.000 | +-+ 1 row in set (0.36 seconds) {code} This is also correct. Zero point on the timeline corresponds to 1970-01-01 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai time zone. Now things get worse: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); Empty set (0.47 seconds) {code} *{color:#de350b}This is wrong.{color}* We should get the record as a result. We could fix it the following way: {code:java} Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); ++ | t | ++ | 1970-01-01 08:00:00.00 | ++ 1 row in set (0.37 seconds) {code} Even though we got the record, we should not specify 8*60*60 argument to TO_TIMESTAMP_LTZ. But the most ridiculous result is the following: {code:java} Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); +
[jira] [Commented] (FLINK-35650) Incorrect TIMESTAMP_LTZ type behavior in Table SQL
[ https://issues.apache.org/jira/browse/FLINK-35650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856316#comment-17856316 ] Dawid Wysakowicz commented on FLINK-35650: -- I think it is most likely related to https://issues.apache.org/jira/browse/FLINK-34938 > Incorrect TIMESTAMP_LTZ type behavior in Table SQL > -- > > Key: FLINK-35650 > URL: https://issues.apache.org/jira/browse/FLINK-35650 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Client, Table SQL / Runtime >Affects Versions: 1.17.2, 1.18.1, 1.20.0 > Environment: Local environment, Open Source Flink without > modifications, the cluster started by ./bin/start-cluster.sh >Reporter: Andrey Gaskov >Priority: Critical > > The file named /home/miron/tmp/data.csv contains a single line: > {code:java} > "1970-01-01 00:00:00Z" {code} > > Run the following commands in Flink SQL client: > {code:java} > Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; > [INFO] Execute statement succeeded. > Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; > [INFO] Execute statement succeeded. > Flink SQL> SET 'execution.runtime-mode' = 'batch'; > [INFO] Execute statement succeeded. > Flink SQL> > > create table t_in ( > > t timestamp_ltz > > ) with ( > > 'connector' = 'filesystem', > > 'path' = '/home/miron/tmp/data.csv', > > 'format' = 'csv' > > ); > [INFO] Execute statement succeeded. > Flink SQL> select * from t_in; > ++ > | t | > ++ > | 1970-01-01 08:00:00.00 | > ++ > 1 row in set (1.33 seconds) > {code} > So far so good. The behavior corresponds to the specification. > > Run the following query: > {code:java} > Flink SQL> select TO_TIMESTAMP_LTZ(0, 0); > +-+ > | EXPR$0 | > +-+ > | 1970-01-01 08:00:00.000 | > +-+ > 1 row in set (0.36 seconds) > {code} > This is also correct. Zero point on the timeline corresponds to 1970-01-01 > 00:00:00 at zero UTC offset which is 1970-01-01 08:00:00 at Asia/Shanghai > time zone. > > Now things get worse: > {code:java} > Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(0, 0); > Empty set (0.47 seconds) {code} > *{color:#de350b}This is wrong.{color}* We should get the record as a result. > > We could fix it the following way: > {code:java} > Flink SQL> select * from t_in where t <= TO_TIMESTAMP_LTZ(8*60*60, 0); > ++ > | t | > ++ > | 1970-01-01 08:00:00.00 | > ++ > 1 row in set (0.37 seconds) {code} > Even though we got the record, we should not specify 8*60*60 argument to > TO_TIMESTAMP_LTZ. > > But the most ridiculous result is the following: > {code:java} > Flink SQL> select * from t_in where t = TO_TIMESTAMP_LTZ(8*60*60, 0); > ++ > | t | > ++ > | 1970-01-01 16:00:00.00 | > ++ > 1 row in set (0.37 seconds) {code} > *{color:#de350b}This is absolutely wrong.{color}* By changing the comparison > function from "<=" to "=" in the where clause we got the wrong time ({*}16:00 > instead of 08:00{*}). > > The same behavior we get in Java. The result is an object of Instant class > with wrong value. Also, in Java I got more wrong cases that could not be > reproduced using SQL Client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33142) Prometheus Sink Connector - Update Documentation
[ https://issues.apache.org/jira/browse/FLINK-33142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856331#comment-17856331 ] Lorenzo Nicora commented on FLINK-33142: [~Hong Teoh] can you please assign this to me. I will start working on docs > Prometheus Sink Connector - Update Documentation > > > Key: FLINK-33142 > URL: https://issues.apache.org/jira/browse/FLINK-33142 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Lorenzo Nicora >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35651) Unable to call argument-less built-in functions with parentheses
yux created FLINK-35651: --- Summary: Unable to call argument-less built-in functions with parentheses Key: FLINK-35651 URL: https://issues.apache.org/jira/browse/FLINK-35651 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, CDC Pipeline transform module handles some built-in functions specially, including LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME, CURRENT_DATE, and CURRENT_TIMESTAMP. In SystemFunctionUtils implementation, they have arguments actually, but are silently bound to current _{_}epoch_time{_}_ and _{_}time_zone{_}_ variable. However, user could not call these functions with parentheses directly because function signature doesn't match. For example, ```yaml transform: - projection: CURRENT_TIMESTAMP AS TS ``` is OK since it was automatically expanded to something like `currentTimestamp(_{_}epoch_time{_}{_}, __time_zone{_}_)`. But for this definition: ```yaml transform: - projection: CURRENT_TIMESTAMP() AS TS ``` Transform will fail at runtime: > org.apache.calcite.runtime.CalciteContextException: From line 1, column 12 to > line 1, column 30: No match found for function signature CURRENT_TIMESTAMP() which could be quite confusing to users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35652) FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join
[ https://issues.apache.org/jira/browse/FLINK-35652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35652: --- Description: Ticket for [FLIP-462 Support Custom Data Distribution for Input Stream of Lookup Join|https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join] > FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join > -- > > Key: FLINK-35652 > URL: https://issues.apache.org/jira/browse/FLINK-35652 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Weijie Guo >Priority: Major > > Ticket for [FLIP-462 Support Custom Data Distribution for Input Stream of > Lookup > Join|https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35652) FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join
Weijie Guo created FLINK-35652: -- Summary: FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join Key: FLINK-35652 URL: https://issues.apache.org/jira/browse/FLINK-35652 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35653) "fraud detection" example missed "env.execute" explanation
[ https://issues.apache.org/jira/browse/FLINK-35653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856372#comment-17856372 ] Luke Chen commented on FLINK-35653: --- Working on the fix. > "fraud detection" example missed "env.execute" explanation > -- > > Key: FLINK-35653 > URL: https://issues.apache.org/jira/browse/FLINK-35653 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Luke Chen >Priority: Major > > In "try flink" page, there is a [fraud detection with the dataStream > API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/] > example to demo how to build a stateful streaming application with Flink’s > DataStream API. In this page, we explained the code line by line, but missed > the last line: > {code:java} > env.execute("Fraud Detection");{code} > > I confirmed this was existed before > [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365], > but when we migrated Flink docs from Jekyll to Hugo in this > [PR|https://github.com/apache/flink/pull/14903], we missed that section. This > miss will let first flink users miss the important line to execute the flink > job. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35653) "fraud detection" example missed "env.execute" explanation
Luke Chen created FLINK-35653: - Summary: "fraud detection" example missed "env.execute" explanation Key: FLINK-35653 URL: https://issues.apache.org/jira/browse/FLINK-35653 Project: Flink Issue Type: Bug Components: Documentation Reporter: Luke Chen In "try flink" page, there is a [fraud detection with the dataStream API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/] example to demo how to build a stateful streaming application with Flink’s DataStream API. In this page, we explained the code line by line, but missed the last line: {code:java} env.execute("Fraud Detection");{code} I confirmed this was existed before [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365], but when we migrated Flink docs from Jekyll to Hugo in this [PR|https://github.com/apache/flink/pull/14903], we missed that section. This miss will let first flink users miss the important line to execute the flink job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35653) "fraud detection" example missed "env.execute" explanation
[ https://issues.apache.org/jira/browse/FLINK-35653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated FLINK-35653: -- Description: In "try flink" page, there is a [fraud detection with the dataStream API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/#breaking-down-the-code] example to demo how to build a stateful streaming application with Flink’s DataStream API. In this page, we explained the code line by line, but missed the last line: {code:java} env.execute("Fraud Detection");{code} I confirmed this was existed before [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365], but when we migrated Flink docs from Jekyll to Hugo in this [PR|https://github.com/apache/flink/pull/14903], we missed that section. This miss will let first flink users miss the important line to execute the flink job. was: In "try flink" page, there is a [fraud detection with the dataStream API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/] example to demo how to build a stateful streaming application with Flink’s DataStream API. In this page, we explained the code line by line, but missed the last line: {code:java} env.execute("Fraud Detection");{code} I confirmed this was existed before [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365], but when we migrated Flink docs from Jekyll to Hugo in this [PR|https://github.com/apache/flink/pull/14903], we missed that section. This miss will let first flink users miss the important line to execute the flink job. > "fraud detection" example missed "env.execute" explanation > -- > > Key: FLINK-35653 > URL: https://issues.apache.org/jira/browse/FLINK-35653 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Luke Chen >Priority: Major > > In "try flink" page, there is a [fraud detection with the dataStream > API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/#breaking-down-the-code] > example to demo how to build a stateful streaming application with Flink’s > DataStream API. In this page, we explained the code line by line, but missed > the last line: > {code:java} > env.execute("Fraud Detection");{code} > > I confirmed this was existed before > [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365], > but when we migrated Flink docs from Jekyll to Hugo in this > [PR|https://github.com/apache/flink/pull/14903], we missed that section. This > miss will let first flink users miss the important line to execute the flink > job. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33936) Outputting Identical Results in Mini-Batch Aggregation with Set TTL
[ https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856374#comment-17856374 ] Weijie Guo edited comment on FLINK-33936 at 6/20/24 2:56 AM: - master(1.20) via d661ec2983ef249911698a1d7ebbb7ec58a9e62a. was (Author: weijie guo): master(1.20) via https://github.com/apache/flink/commit/d661ec2983ef249911698a1d7ebbb7ec58a9e62a. > Outputting Identical Results in Mini-Batch Aggregation with Set TTL > --- > > Key: FLINK-33936 > URL: https://issues.apache.org/jira/browse/FLINK-33936 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Feng Jin >Assignee: Feng Jin >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > If mini-batch is enabled currently, and if the aggregated result is the same > as the previous output, this current aggregation result will not be sent > downstream. This will cause downstream nodes to not receive updated data. If > there is a TTL set for states at this time, the TTL of downstream will not be > updated either. > The specific logic is as follows. > https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224 > {code:java} > if (!equaliser.equals(prevAggValue, newAggValue)) { > // new row is not same with prev row > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow > .replace(currentKey, prevAggValue) > .setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > out.collect(resultRow); > } > // new row is same with prev row, no need to output > {code} > When mini-batch is not enabled, even if the aggregation result of this time > is the same as last time, new results will still be sent if TTL is set. > https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170 > {code:java} > if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, > newAggValue)) { > // newRow is the same as before and state cleaning is not > enabled. > // We do not emit retraction and acc message. > // If state cleaning is enabled, we have to emit messages > to prevent too early > // state eviction of downstream operators. > return; > } else { > // retract previous result > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow > .replace(currentKey, prevAggValue) > .setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > } > {code} > Therefore, based on the consideration of TTL scenarios, I believe that when > mini-batch aggregation is enabled, new results should also output when the > aggregated result is the same as the previous one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33936) Outputting Identical Results in Mini-Batch Aggregation with Set TTL
[ https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-33936. -- Fix Version/s: 1.20.0 Resolution: Fixed master(1.20) via https://github.com/apache/flink/commit/d661ec2983ef249911698a1d7ebbb7ec58a9e62a. > Outputting Identical Results in Mini-Batch Aggregation with Set TTL > --- > > Key: FLINK-33936 > URL: https://issues.apache.org/jira/browse/FLINK-33936 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Feng Jin >Assignee: Feng Jin >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > If mini-batch is enabled currently, and if the aggregated result is the same > as the previous output, this current aggregation result will not be sent > downstream. This will cause downstream nodes to not receive updated data. If > there is a TTL set for states at this time, the TTL of downstream will not be > updated either. > The specific logic is as follows. > https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224 > {code:java} > if (!equaliser.equals(prevAggValue, newAggValue)) { > // new row is not same with prev row > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow > .replace(currentKey, prevAggValue) > .setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > out.collect(resultRow); > } > // new row is same with prev row, no need to output > {code} > When mini-batch is not enabled, even if the aggregation result of this time > is the same as last time, new results will still be sent if TTL is set. > https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170 > {code:java} > if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, > newAggValue)) { > // newRow is the same as before and state cleaning is not > enabled. > // We do not emit retraction and acc message. > // If state cleaning is enabled, we have to emit messages > to prevent too early > // state eviction of downstream operators. > return; > } else { > // retract previous result > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow > .replace(currentKey, prevAggValue) > .setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > } > {code} > Therefore, based on the consideration of TTL scenarios, I believe that when > mini-batch aggregation is enabled, new results should also output when the > aggregated result is the same as the previous one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35653) "fraud detection" example missed "env.execute" explanation
[ https://issues.apache.org/jira/browse/FLINK-35653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856375#comment-17856375 ] Luke Chen commented on FLINK-35653: --- PR: https://github.com/apache/flink/pull/24959 > "fraud detection" example missed "env.execute" explanation > -- > > Key: FLINK-35653 > URL: https://issues.apache.org/jira/browse/FLINK-35653 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Luke Chen >Priority: Major > > In "try flink" page, there is a [fraud detection with the dataStream > API|https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/#breaking-down-the-code] > example to demo how to build a stateful streaming application with Flink’s > DataStream API. In this page, we explained the code line by line, but missed > the last line: > {code:java} > env.execute("Fraud Detection");{code} > > I confirmed this was existed before > [v1.12|https://github.com/apache/flink/blob/release-1.12/docs/try-flink/datastream_api.md?plain=1#L365], > but when we migrated Flink docs from Jekyll to Hugo in this > [PR|https://github.com/apache/flink/pull/14903], we missed that section. This > miss will let first flink users miss the important line to execute the flink > job. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35654) Add Flink CDC verification guide in docs
yux created FLINK-35654: --- Summary: Add Flink CDC verification guide in docs Key: FLINK-35654 URL: https://issues.apache.org/jira/browse/FLINK-35654 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, ASF voting process requires vast quality verification before releasing any new versions, including: * Tarball checksum verification * Compile from source code * Run pipeline E2e tests * Run migration tests * Check if jar was packaged with correct JDK version * ... Adding verification guide in Flink CDC docs should help developers verify future releases more easily. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35630) Kafka source may reset the consume offset to earliest when the partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-35630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856378#comment-17856378 ] tanjialiang commented on FLINK-35630: - [~martijnvisser] Your point makes a lot of sense. The Kafka consumer will theoretically only consume data that is smaller than the high watermark, and the current problem I'm having is that after changing partition leaders, the consume offset goes out of range. Currently our Kafka cluster version is 2.3.0, and after much troubleshooting to confirm, we think we are experiencing this problem https://issues.apache.org/jira/browse/KAFKA-9835. > Kafka source may reset the consume offset to earliest when the partition > leader changes > --- > > Key: FLINK-35630 > URL: https://issues.apache.org/jira/browse/FLINK-35630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-3.2.0 >Reporter: tanjialiang >Priority: Major > > Kafka producer using the ack=1 option to write data to a topic. > Flink Kafka source startup with the *scan.startup.mode=earliest-offset* > option to consume (the Kafka *auto.offset.reset* option will be force > override to earliest). > If a partition leader is not available, a follower may become the new leader > and this may trigger log truncation. It may cause consumers to consume offset > out of range and use the *auto.offset.reset* strategy to reset the offset. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35655) Support more pipeline sources
linqigeng created FLINK-35655: - Summary: Support more pipeline sources Key: FLINK-35655 URL: https://issues.apache.org/jira/browse/FLINK-35655 Project: Flink Issue Type: New Feature Components: Flink CDC Affects Versions: cdc-3.1.1 Reporter: linqigeng It's only support mysql pipeline source currently, expecte more implements of source, like mongodb, oracle, pg... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35642) parsing the configuration file in flink, when configuring the s3 key, if there is contain # char, the correct key will be split
[ https://issues.apache.org/jira/browse/FLINK-35642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] blackpighe updated FLINK-35642: --- Fix Version/s: 1.18.2 (was: 1.19.1) > parsing the configuration file in flink, when configuring the s3 key, if > there is contain # char, the correct key will be split > --- > > Key: FLINK-35642 > URL: https://issues.apache.org/jira/browse/FLINK-35642 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.3, 1.19.0, 1.20.0, 1.19.1 >Reporter: blackpighe >Priority: Major > Fix For: 1.18.2 > > Attachments: image-2024-06-19-11-45-05-871.png > > > parsing the configuration file in flink, when configuring the s3 key, if > there is contain # char, the correct key will be split。 > > such as: > > s3.secret-key: Minio#dct@78 > > > !image-2024-06-19-11-45-05-871.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35642) parsing the configuration file in flink, when configuring the s3 key, if there is contain # char, the correct key will be split
[ https://issues.apache.org/jira/browse/FLINK-35642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856397#comment-17856397 ] blackpighe commented on FLINK-35642: [~JunRuiLi] This feature was only supported in 1.19, I'm using a lower version.I want to solve this problem on 1.18. Can we discuss it. > parsing the configuration file in flink, when configuring the s3 key, if > there is contain # char, the correct key will be split > --- > > Key: FLINK-35642 > URL: https://issues.apache.org/jira/browse/FLINK-35642 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.3, 1.19.0, 1.20.0, 1.19.1 >Reporter: blackpighe >Priority: Major > Fix For: 1.18.2 > > Attachments: image-2024-06-19-11-45-05-871.png > > > parsing the configuration file in flink, when configuring the s3 key, if > there is contain # char, the correct key will be split。 > > such as: > > s3.secret-key: Minio#dct@78 > > > !image-2024-06-19-11-45-05-871.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35642) parsing the configuration file in flink, when configuring the s3 key, if there is contain # char, the correct key will be split
[ https://issues.apache.org/jira/browse/FLINK-35642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856398#comment-17856398 ] blackpighe commented on FLINK-35642: The main reason is that the problem is hidden deeply and it is difficult to locate the problem > parsing the configuration file in flink, when configuring the s3 key, if > there is contain # char, the correct key will be split > --- > > Key: FLINK-35642 > URL: https://issues.apache.org/jira/browse/FLINK-35642 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.3, 1.19.0, 1.20.0, 1.19.1 >Reporter: blackpighe >Priority: Major > Fix For: 1.18.2 > > Attachments: image-2024-06-19-11-45-05-871.png > > > parsing the configuration file in flink, when configuring the s3 key, if > there is contain # char, the correct key will be split。 > > such as: > > s3.secret-key: Minio#dct@78 > > > !image-2024-06-19-11-45-05-871.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)