退订
退订
Flink SQL向下兼容吗?
各位大佬好, 请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗? 比如我升级到1.13,那我1.10的SQL语法能被兼容吗? 感恩 | | Chuang Li | | jasonlee1...@163.com | 签名由网易邮箱大师定制
Re: 如何通过Flink-sql 实现3个kafka-topic的join以及后续的窗口聚合计算操作?
Hi! 双流 join 操作确实会丢弃 time attribute 字段。如果其中一些表变化不大的话,可以考虑改成维表 join 的形式就能留下 time attribute。如果每张表都很容易变化,可以考虑使用新引入的 event time temporal join[1],肯定能满足需求。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join wang guanglei 于2021年8月10日周二 上午9:35写道: > 同行们,大家好, > > 请教一个问题,现在有3个kafka的topic:device consumer order > 想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。 > 我是这么设计的: > > 1. 先通过 create table with(...='kafka') ... 注册出 table1 table2 table3 > ,指定事件时间、water mark > 2. 进行3张表的关联: > > create temporary view wide_table as ( > select *** > from table1 > join table2 on ... > join table3 on ... > ) > > 1. 在wide-table上进行聚合计算: > > select deviceId > ,HOP_START(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND) > ,count(1) as cnt > from wide_table as voc > group by HOP(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND) >,deviceId > > 任务开始运行之后,一直没有聚合计算的结果,通过 select * from wide_table 可以看到明细。 > > > 经过很长一段时间的搜索,发现table1/2/3中的time-attr的字段,在经过join之后,time-attr属性会被丢弃,进而无法在窗口中使用。 > > 请问大家,像这种多个topic的join,然后再聚合操作,用Flink-sql实现的话,怎么做呢?还是我使用方式上有问题呢? > > 非常感谢!!! >
Re: [ANNOUNCE] Apache Flink 1.13.2 released
Thanks Yun and everyone~! Thank you~ Xintong Song On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann wrote: > Thanks Yun Tang for being our release manager and the great work! Also > thanks a lot to everyone who contributed to this release. > > Cheers, > Till > > On Mon, Aug 9, 2021 at 9:48 AM Yu Li wrote: > >> Thanks Yun Tang for being our release manager and everyone else who made >> the release possible! >> >> Best Regards, >> Yu >> >> >> On Fri, 6 Aug 2021 at 13:52, Yun Tang wrote: >> >>> >>> The Apache Flink community is very happy to announce the release of >>> Apache Flink 1.13.2, which is the second bugfix release for the Apache >>> Flink 1.13 series. >>> >>> Apache Flink® is an open-source stream processing framework for >>> distributed, high-performing, always-available, and accurate data streaming >>> applications. >>> >>> The release is available for download at: >>> https://flink.apache.org/downloads.html >>> >>> Please check out the release blog post for an overview of the >>> improvements for this bugfix release: >>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html >>> >>> The full release notes are available in Jira: >>> >>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522 >>> >>> We would like to thank all contributors of the Apache Flink community >>> who made this release possible! >>> >>> Regards, >>> Yun Tang >>> >>
如何通过Flink-sql 实现3个kafka-topic的join以及后续的窗口聚合计算操作?
同行们,大家好, 请教一个问题,现在有3个kafka的topic:device consumer order 想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。 我是这么设计的: 1. 先通过 create table with(...='kafka') ... 注册出 table1 table2 table3 ,指定事件时间、water mark 2. 进行3张表的关联: create temporary view wide_table as ( select *** from table1 join table2 on ... join table3 on ... ) 1. 在wide-table上进行聚合计算: select deviceId ,HOP_START(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND) ,count(1) as cnt from wide_table as voc group by HOP(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND) ,deviceId 任务开始运行之后,一直没有聚合计算的结果,通过 select * from wide_table 可以看到明细。 经过很长一段时间的搜索,发现table1/2/3中的time-attr的字段,在经过join之后,time-attr属性会被丢弃,进而无法在窗口中使用。 请问大家,像这种多个topic的join,然后再聚合操作,用Flink-sql实现的话,怎么做呢?还是我使用方式上有问题呢? 非常感谢!!!
Re: [ANNOUNCE] Apache Flink 1.13.2 released
Thanks Yun Tang for being our release manager and the great work! Also thanks a lot to everyone who contributed to this release. Cheers, Till On Mon, Aug 9, 2021 at 9:48 AM Yu Li wrote: > Thanks Yun Tang for being our release manager and everyone else who made > the release possible! > > Best Regards, > Yu > > > On Fri, 6 Aug 2021 at 13:52, Yun Tang wrote: > >> >> The Apache Flink community is very happy to announce the release of >> Apache Flink 1.13.2, which is the second bugfix release for the Apache >> Flink 1.13 series. >> >> Apache Flink® is an open-source stream processing framework for >> distributed, high-performing, always-available, and accurate data streaming >> applications. >> >> The release is available for download at: >> https://flink.apache.org/downloads.html >> >> Please check out the release blog post for an overview of the >> improvements for this bugfix release: >> https://flink.apache.org/news/2021/08/06/release-1.13.2.html >> >> The full release notes are available in Jira: >> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522 >> >> We would like to thank all contributors of the Apache Flink community who >> made this release possible! >> >> Regards, >> Yun Tang >> >
Re: Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
我看flink sql写到kafka的时候,也没有开启事务。所以,这一点,我非常想不通 东东 于2021年8月9日周一 下午5:56写道: > > > > 有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。 > > > 在 2021-08-09 17:44:57,"Jim Chen" 写道: > >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop > >with savepoint,结果没有重复 > > > >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why.. > > > >东东 于2021年8月2日周一 下午7:13写道: > > > >> 从topic B实时写到hive,这个job需要配置 isolation.level 为 > >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 > >> > >> > >> 在 2021-08-02 19:00:13,"Jim Chen" 写道: > >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。 > >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic > >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 > >> > > >> >东东 于2021年8月2日周一 下午6:20写道: > >> > > >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed > >> >> > >> >> > >> >> 在 2021-08-02 18:14:27,"Jim Chen" 写道: > >> >> >Hi 刘建刚, > >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 > >> >> >停止命令: > >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ > >> >> >-yid application_1625497885855_703064 \ > >> >> >-p > >> >> > >> >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> >\ > >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > >> >> > > >> >> >重启命令: > >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> >-m yarn-cluster \ > >> >> >-yjm 4096 -ytm 4096 \ > >> >> >-ynm User_Click_Log_Split_All \ > >> >> >-yqu syh_offline \ > >> >> >-ys 2 \ > >> >> >-d \ > >> >> >-p 64 \ > >> >> >-s > >> >> > >> >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 > >> >> >\ > >> >> >-n \ > >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> >> > >> >> > >> > >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> >> > > >> >> > > >> >> >刘建刚 于2021年8月2日周一 下午3:49写道: > >> >> > > >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with > >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 > >> >> >> > >> >> >> > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ > >> >> >> > >> >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: > >> >> >> > >> >> >> > 我是通过savepoint的方式重启的,命令如下: > >> >> >> > > >> >> >> > cancel command: > >> >> >> > > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > >> >> >> > -yid application_1625497885855_698371 \ > >> >> >> > -s > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> >> > \ > >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a > >> >> >> > > >> >> >> > print savepoint: > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> >> > > >> >> >> > > >> >> >> > restart command: > >> >> >> > > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> >> > -m yarn-cluster \ > >> >> >> > -yjm 4096 -ytm 4096 \ > >> >> >> > -ynm User_Click_Log_Split_All \ > >> >> >> > -yqu syh_offline \ > >> >> >> > -ys 2 \ > >> >> >> > -d \ > >> >> >> > -p 64 \ > >> >> >> > -s > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> >> > \ > >> >> >> > -n \ > >> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> >> >> > > >> >> >> > Jim Chen 于2021年8月2日周一 下午2:01写道: > >> >> >> > > >> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > >> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > >> >> >> > > > >> >> >> > > My Versions > >> >> >> > > Flink 1.12.4 > >> >> >> > > Kafka 2.0.1 > >> >> >> > > Java 1.8 > >> >> >> > > > >> >> >> > > Core code: > >> >> >> > > > >> >> >> > > env.enableCheckpointing(30); > >> >> >> > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> >> >> > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> >> >> > > > >> >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > >> >> >> > > > >> >> >> > > tableEnv.createTemporaryView("data_table",dataDS); > >> >> >> > > String sql = "select * from data_table a inner join > >> >> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime > as > >> b > >> >> on > >> >> >> > a.id > >> >> >> > > = b.id" > >> >> >> > > Table table = tableEnv.sqlQuery(sql); > >> >> >> > > DataStream resultDS = tableEnv.toAppendStream(table, > >> >> >> Row.class).map(xx); > >> >> >> >
Re:Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。 在 2021-08-09 17:44:57,"Jim Chen" 写道: >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop >with savepoint,结果没有重复 > >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why.. > >东东 于2021年8月2日周一 下午7:13写道: > >> 从topic B实时写到hive,这个job需要配置 isolation.level 为 >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 >> >> >> 在 2021-08-02 19:00:13,"Jim Chen" 写道: >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。 >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 >> > >> >东东 于2021年8月2日周一 下午6:20写道: >> > >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed >> >> >> >> >> >> 在 2021-08-02 18:14:27,"Jim Chen" 写道: >> >> >Hi 刘建刚, >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 >> >> >停止命令: >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ >> >> >-yid application_1625497885855_703064 \ >> >> >-p >> >> >> >> >> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> >> >\ >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 >> >> > >> >> >重启命令: >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> >> >-m yarn-cluster \ >> >> >-yjm 4096 -ytm 4096 \ >> >> >-ynm User_Click_Log_Split_All \ >> >> >-yqu syh_offline \ >> >> >-ys 2 \ >> >> >-d \ >> >> >-p 64 \ >> >> >-s >> >> >> >> >> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 >> >> >\ >> >> >-n \ >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> >> >> >> >> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar >> >> > >> >> > >> >> >刘建刚 于2021年8月2日周一 下午3:49写道: >> >> > >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 >> >> >> >> >> >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ >> >> >> >> >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: >> >> >> >> >> >> > 我是通过savepoint的方式重启的,命令如下: >> >> >> > >> >> >> > cancel command: >> >> >> > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ >> >> >> > -yid application_1625497885855_698371 \ >> >> >> > -s >> >> >> > >> >> >> > >> >> >> >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> >> >> > \ >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a >> >> >> > >> >> >> > print savepoint: >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> >> >> > >> >> >> > >> >> >> > restart command: >> >> >> > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> >> >> > -m yarn-cluster \ >> >> >> > -yjm 4096 -ytm 4096 \ >> >> >> > -ynm User_Click_Log_Split_All \ >> >> >> > -yqu syh_offline \ >> >> >> > -ys 2 \ >> >> >> > -d \ >> >> >> > -p 64 \ >> >> >> > -s >> >> >> > >> >> >> > >> >> >> >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> >> >> > \ >> >> >> > -n \ >> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> >> >> > >> >> >> > >> >> >> >> >> >> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar >> >> >> > >> >> >> > Jim Chen 于2021年8月2日周一 下午2:01写道: >> >> >> > >> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. >> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! >> >> >> > > >> >> >> > > My Versions >> >> >> > > Flink 1.12.4 >> >> >> > > Kafka 2.0.1 >> >> >> > > Java 1.8 >> >> >> > > >> >> >> > > Core code: >> >> >> > > >> >> >> > > env.enableCheckpointing(30); >> >> >> > > >> >> >> > > >> >> >> > >> >> >> >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> >> > > >> >> >> > > >> >> >> > >> >> >> >> >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> >> > > >> >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); >> >> >> > > >> >> >> > > tableEnv.createTemporaryView("data_table",dataDS); >> >> >> > > String sql = "select * from data_table a inner join >> >> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as >> b >> >> on >> >> >> > a.id >> >> >> > > = b.id" >> >> >> > > Table table = tableEnv.sqlQuery(sql); >> >> >> > > DataStream resultDS = tableEnv.toAppendStream(table, >> >> >> Row.class).map(xx); >> >> >> > > >> >> >> > > // Kafka producer parameter >> >> >> > > Properties producerProps = new Properties(); >> >> >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >> >> >> > > bootstrapServers); >> >> >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); >> >> >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, >> >> >> > kafkaBufferMemory); >> >> >> > > producerProps.put(ProducerCo
Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop with savepoint,结果没有重复 我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why.. 东东 于2021年8月2日周一 下午7:13写道: > 从topic B实时写到hive,这个job需要配置 isolation.level 为 > read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 > > > 在 2021-08-02 19:00:13,"Jim Chen" 写道: > >我不太懂,下游的isolation.level是不是read_committed是啥意思。 > >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic > >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 > > > >东东 于2021年8月2日周一 下午6:20写道: > > > >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed > >> > >> > >> 在 2021-08-02 18:14:27,"Jim Chen" 写道: > >> >Hi 刘建刚, > >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 > >> >停止命令: > >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ > >> >-yid application_1625497885855_703064 \ > >> >-p > >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >\ > >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > >> > > >> >重启命令: > >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >-m yarn-cluster \ > >> >-yjm 4096 -ytm 4096 \ > >> >-ynm User_Click_Log_Split_All \ > >> >-yqu syh_offline \ > >> >-ys 2 \ > >> >-d \ > >> >-p 64 \ > >> >-s > >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 > >> >\ > >> >-n \ > >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> > >> > >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> > > >> > > >> >刘建刚 于2021年8月2日周一 下午3:49写道: > >> > > >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with > >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 > >> >> > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ > >> >> > >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: > >> >> > >> >> > 我是通过savepoint的方式重启的,命令如下: > >> >> > > >> >> > cancel command: > >> >> > > >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > >> >> > -yid application_1625497885855_698371 \ > >> >> > -s > >> >> > > >> >> > > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> > \ > >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a > >> >> > > >> >> > print savepoint: > >> >> > > >> >> > > >> >> > > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> > > >> >> > > >> >> > restart command: > >> >> > > >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> > -m yarn-cluster \ > >> >> > -yjm 4096 -ytm 4096 \ > >> >> > -ynm User_Click_Log_Split_All \ > >> >> > -yqu syh_offline \ > >> >> > -ys 2 \ > >> >> > -d \ > >> >> > -p 64 \ > >> >> > -s > >> >> > > >> >> > > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> > \ > >> >> > -n \ > >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> >> > > >> >> > > >> >> > >> > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> >> > > >> >> > Jim Chen 于2021年8月2日周一 下午2:01写道: > >> >> > > >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > >> >> > > > >> >> > > My Versions > >> >> > > Flink 1.12.4 > >> >> > > Kafka 2.0.1 > >> >> > > Java 1.8 > >> >> > > > >> >> > > Core code: > >> >> > > > >> >> > > env.enableCheckpointing(30); > >> >> > > > >> >> > > > >> >> > > >> >> > >> > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> >> > > > >> >> > > > >> >> > > >> >> > >> > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> >> > > > >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > >> >> > > > >> >> > > tableEnv.createTemporaryView("data_table",dataDS); > >> >> > > String sql = "select * from data_table a inner join > >> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as > b > >> on > >> >> > a.id > >> >> > > = b.id" > >> >> > > Table table = tableEnv.sqlQuery(sql); > >> >> > > DataStream resultDS = tableEnv.toAppendStream(table, > >> >> Row.class).map(xx); > >> >> > > > >> >> > > // Kafka producer parameter > >> >> > > Properties producerProps = new Properties(); > >> >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > >> >> > > bootstrapServers); > >> >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > >> >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, > >> >> > kafkaBufferMemory); > >> >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, > kafkaBatchSize); > >> >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, > kafkaLingerMs); > >> >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, > >> 30); > >> >> > > > >> producerProps.put(ProducerConfig.MA
Re: [ANNOUNCE] Apache Flink 1.13.2 released
Thanks Yun Tang for being our release manager and everyone else who made the release possible! Best Regards, Yu On Fri, 6 Aug 2021 at 13:52, Yun Tang wrote: > > The Apache Flink community is very happy to announce the release of Apache > Flink 1.13.2, which is the second bugfix release for the Apache Flink 1.13 > series. > > Apache Flink® is an open-source stream processing framework for > distributed, high-performing, always-available, and accurate data streaming > applications. > > The release is available for download at: > https://flink.apache.org/downloads.html > > Please check out the release blog post for an overview of the improvements > for this bugfix release: > https://flink.apache.org/news/2021/08/06/release-1.13.2.html > > The full release notes are available in Jira: > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522 > > We would like to thank all contributors of the Apache Flink community who > made this release possible! > > Regards, > Yun Tang >