退订

2021-08-09 文章 xg...@126.com
退订






Flink SQL向下兼容吗?

2021-08-09 文章 Jason Lee
各位大佬好,


请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗?
比如我升级到1.13,那我1.10的SQL语法能被兼容吗?


感恩


| |
Chuang Li
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制



Re: 如何通过Flink-sql 实现3个kafka-topic的join以及后续的窗口聚合计算操作?

2021-08-09 文章 Caizhi Weng
Hi!

双流 join 操作确实会丢弃 time attribute 字段。如果其中一些表变化不大的话,可以考虑改成维表 join 的形式就能留下 time
attribute。如果每张表都很容易变化,可以考虑使用新引入的 event time temporal join[1],肯定能满足需求。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

wang guanglei  于2021年8月10日周二 上午9:35写道:

> 同行们,大家好,
>
> 请教一个问题,现在有3个kafka的topic:device  consumer order
> 想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。
> 我是这么设计的:
>
>   1.  先通过 create table   with(...='kafka') ... 注册出 table1 table2 table3
> ,指定事件时间、water mark
>   2.  进行3张表的关联:
>
> create temporary view wide_table as (
>   select   ***
>   from   table1
>   join   table2  on ...
>   join   table3  on ...
>  )
>
>   1.  在wide-table上进行聚合计算:
>
> select deviceId
>  ,HOP_START(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
>  ,count(1) as cnt
> from wide_table as voc
> group by HOP(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
>,deviceId
>
> 任务开始运行之后,一直没有聚合计算的结果,通过 select * from wide_table 可以看到明细。
>
>
> 经过很长一段时间的搜索,发现table1/2/3中的time-attr的字段,在经过join之后,time-attr属性会被丢弃,进而无法在窗口中使用。
>
> 请问大家,像这种多个topic的join,然后再聚合操作,用Flink-sql实现的话,怎么做呢?还是我使用方式上有问题呢?
>
> 非常感谢!!!
>


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-09 文章 Xintong Song
Thanks Yun and everyone~!

Thank you~

Xintong Song



On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann  wrote:

> Thanks Yun Tang for being our release manager and the great work! Also
> thanks a lot to everyone who contributed to this release.
>
> Cheers,
> Till
>
> On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:
>
>> Thanks Yun Tang for being our release manager and everyone else who made
>> the release possible!
>>
>> Best Regards,
>> Yu
>>
>>
>> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
>>
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
>>> Flink 1.13 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Yun Tang
>>>
>>


如何通过Flink-sql 实现3个kafka-topic的join以及后续的窗口聚合计算操作?

2021-08-09 文章 wang guanglei
同行们,大家好,

请教一个问题,现在有3个kafka的topic:device  consumer order
想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。
我是这么设计的:

  1.  先通过 create table   with(...='kafka') ... 注册出 table1 table2 table3  
,指定事件时间、water mark
  2.  进行3张表的关联:

create temporary view wide_table as (
  select   ***
  from   table1
  join   table2  on ...
  join   table3  on ...
 )

  1.  在wide-table上进行聚合计算:

select deviceId
 ,HOP_START(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
 ,count(1) as cnt
from wide_table as voc
group by HOP(voc.timestamp, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
   ,deviceId

任务开始运行之后,一直没有聚合计算的结果,通过 select * from wide_table 可以看到明细。

经过很长一段时间的搜索,发现table1/2/3中的time-attr的字段,在经过join之后,time-attr属性会被丢弃,进而无法在窗口中使用。

请问大家,像这种多个topic的join,然后再聚合操作,用Flink-sql实现的话,怎么做呢?还是我使用方式上有问题呢?

非常感谢!!!


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-09 文章 Till Rohrmann
Thanks Yun Tang for being our release manager and the great work! Also
thanks a lot to everyone who contributed to this release.

Cheers,
Till

On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:

> Thanks Yun Tang for being our release manager and everyone else who made
> the release possible!
>
> Best Regards,
> Yu
>
>
> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
>
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
>> Flink 1.13 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Yun Tang
>>
>


Re: Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 文章 Jim Chen
我看flink sql写到kafka的时候,也没有开启事务。所以,这一点,我非常想不通


东东  于2021年8月9日周一 下午5:56写道:

>
>
>
> 有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。
>
>
> 在 2021-08-09 17:44:57,"Jim Chen"  写道:
> >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
> >with savepoint,结果没有重复
> >
> >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..
> >
> >东东  于2021年8月2日周一 下午7:13写道:
> >
> >> 从topic B实时写到hive,这个job需要配置 isolation.level 为
> >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
> >>
> >>
> >> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
> >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
> >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
> >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
> >> >
> >> >东东  于2021年8月2日周一 下午6:20写道:
> >> >
> >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
> >> >>
> >> >>
> >> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >> >> >Hi 刘建刚,
> >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >> >> >停止命令:
> >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >> >> >-yid application_1625497885855_703064 \
> >> >> >-p
> >> >>
> >> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> >\
> >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >> >> >
> >> >> >重启命令:
> >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> >-m yarn-cluster \
> >> >> >-yjm 4096 -ytm 4096 \
> >> >> >-ynm User_Click_Log_Split_All \
> >> >> >-yqu syh_offline \
> >> >> >-ys 2 \
> >> >> >-d \
> >> >> >-p 64 \
> >> >> >-s
> >> >>
> >> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >> >> >\
> >> >> >-n \
> >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >>
> >> >>
> >>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >> >
> >> >> >
> >> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >> >> >
> >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >> >> >>
> >> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >> >> >>
> >> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >> >> >>
> >> >> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >> >> >
> >> >> >> > cancel command:
> >> >> >> >
> >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> >> >> > -yid application_1625497885855_698371 \
> >> >> >> > -s
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> >> > \
> >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >> >> >
> >> >> >> > print savepoint:
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> >> >
> >> >> >> >
> >> >> >> > restart command:
> >> >> >> >
> >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> >> > -m yarn-cluster \
> >> >> >> > -yjm 4096 -ytm 4096 \
> >> >> >> > -ynm User_Click_Log_Split_All \
> >> >> >> > -yqu syh_offline \
> >> >> >> > -ys 2 \
> >> >> >> > -d \
> >> >> >> > -p 64 \
> >> >> >> > -s
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> >> > \
> >> >> >> > -n \
> >> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >> >> >
> >> >> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >> >> >> >
> >> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> >> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >> >> >> > >
> >> >> >> > > My Versions
> >> >> >> > > Flink 1.12.4
> >> >> >> > > Kafka 2.0.1
> >> >> >> > > Java 1.8
> >> >> >> > >
> >> >> >> > > Core code:
> >> >> >> > >
> >> >> >> > > env.enableCheckpointing(30);
> >> >> >> > >
> >> >> >> > >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >> >> >> > >
> >> >> >> > >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >> >> >> > >
> >> >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> >> >> >> > >
> >> >> >> > > tableEnv.createTemporaryView("data_table",dataDS);
> >> >> >> > > String sql = "select * from data_table a inner join
> >> >> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime
> as
> >> b
> >> >> on
> >> >> >> > a.id
> >> >> >> > > = b.id"
> >> >> >> > > Table table = tableEnv.sqlQuery(sql);
> >> >> >> > > DataStream resultDS = tableEnv.toAppendStream(table,
> >> >> >> Row.class).map(xx);
> >> >> >> >

Re:Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 文章 东东



有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。


在 2021-08-09 17:44:57,"Jim Chen"  写道:
>有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
>with savepoint,结果没有重复
>
>我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..
>
>东东  于2021年8月2日周一 下午7:13写道:
>
>> 从topic B实时写到hive,这个job需要配置 isolation.level 为
>> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
>>
>>
>> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
>> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
>> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
>> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
>> >
>> >东东  于2021年8月2日周一 下午6:20写道:
>> >
>> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
>> >>
>> >>
>> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
>> >> >Hi 刘建刚,
>> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
>> >> >停止命令:
>> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
>> >> >-yid application_1625497885855_703064 \
>> >> >-p
>> >>
>> >>
>> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> >> >\
>> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
>> >> >
>> >> >重启命令:
>> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> >> >-m yarn-cluster \
>> >> >-yjm 4096 -ytm 4096 \
>> >> >-ynm User_Click_Log_Split_All \
>> >> >-yqu syh_offline \
>> >> >-ys 2 \
>> >> >-d \
>> >> >-p 64 \
>> >> >-s
>> >>
>> >>
>> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
>> >> >\
>> >> >-n \
>> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>> >>
>> >>
>> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>> >> >
>> >> >
>> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
>> >> >
>> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
>> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
>> >> >>
>> >> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>> >> >>
>> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
>> >> >>
>> >> >> > 我是通过savepoint的方式重启的,命令如下:
>> >> >> >
>> >> >> > cancel command:
>> >> >> >
>> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
>> >> >> > -yid application_1625497885855_698371 \
>> >> >> > -s
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> >> >> > \
>> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
>> >> >> >
>> >> >> > print savepoint:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >> >> >
>> >> >> >
>> >> >> > restart command:
>> >> >> >
>> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> >> >> > -m yarn-cluster \
>> >> >> > -yjm 4096 -ytm 4096 \
>> >> >> > -ynm User_Click_Log_Split_All \
>> >> >> > -yqu syh_offline \
>> >> >> > -ys 2 \
>> >> >> > -d \
>> >> >> > -p 64 \
>> >> >> > -s
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >> >> > \
>> >> >> > -n \
>> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>> >> >> >
>> >> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
>> >> >> >
>> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
>> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
>> >> >> > >
>> >> >> > > My Versions
>> >> >> > > Flink 1.12.4
>> >> >> > > Kafka 2.0.1
>> >> >> > > Java 1.8
>> >> >> > >
>> >> >> > > Core code:
>> >> >> > >
>> >> >> > > env.enableCheckpointing(30);
>> >> >> > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> >> >> > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> >> >> > >
>> >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>> >> >> > >
>> >> >> > > tableEnv.createTemporaryView("data_table",dataDS);
>> >> >> > > String sql = "select * from data_table a inner join
>> >> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as
>> b
>> >> on
>> >> >> > a.id
>> >> >> > > = b.id"
>> >> >> > > Table table = tableEnv.sqlQuery(sql);
>> >> >> > > DataStream resultDS = tableEnv.toAppendStream(table,
>> >> >> Row.class).map(xx);
>> >> >> > >
>> >> >> > > // Kafka producer parameter
>> >> >> > > Properties producerProps = new Properties();
>> >> >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> >> >> > > bootstrapServers);
>> >> >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
>> >> >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
>> >> >> > kafkaBufferMemory);
>> >> >> > > producerProps.put(ProducerCo

Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 文章 Jim Chen
有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
with savepoint,结果没有重复

我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..

东东  于2021年8月2日周一 下午7:13写道:

> 从topic B实时写到hive,这个job需要配置 isolation.level 为
> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
>
>
> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
> >
> >东东  于2021年8月2日周一 下午6:20写道:
> >
> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
> >>
> >>
> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >> >Hi 刘建刚,
> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >> >停止命令:
> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >> >-yid application_1625497885855_703064 \
> >> >-p
> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >\
> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >> >
> >> >重启命令:
> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >-m yarn-cluster \
> >> >-yjm 4096 -ytm 4096 \
> >> >-ynm User_Click_Log_Split_All \
> >> >-yqu syh_offline \
> >> >-ys 2 \
> >> >-d \
> >> >-p 64 \
> >> >-s
> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >> >\
> >> >-n \
> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >>
> >>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >
> >> >
> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >> >
> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >> >>
> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >> >>
> >> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >> >
> >> >> > cancel command:
> >> >> >
> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> >> > -yid application_1625497885855_698371 \
> >> >> > -s
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> > \
> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >> >
> >> >> > print savepoint:
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> >
> >> >> >
> >> >> > restart command:
> >> >> >
> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> > -m yarn-cluster \
> >> >> > -yjm 4096 -ytm 4096 \
> >> >> > -ynm User_Click_Log_Split_All \
> >> >> > -yqu syh_offline \
> >> >> > -ys 2 \
> >> >> > -d \
> >> >> > -p 64 \
> >> >> > -s
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> > \
> >> >> > -n \
> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >> >
> >> >> >
> >> >>
> >>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >> >
> >> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >> >> >
> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >> >> > >
> >> >> > > My Versions
> >> >> > > Flink 1.12.4
> >> >> > > Kafka 2.0.1
> >> >> > > Java 1.8
> >> >> > >
> >> >> > > Core code:
> >> >> > >
> >> >> > > env.enableCheckpointing(30);
> >> >> > >
> >> >> > >
> >> >> >
> >> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >> >> > >
> >> >> > >
> >> >> >
> >> >>
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >> >> > >
> >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> >> >> > >
> >> >> > > tableEnv.createTemporaryView("data_table",dataDS);
> >> >> > > String sql = "select * from data_table a inner join
> >> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as
> b
> >> on
> >> >> > a.id
> >> >> > > = b.id"
> >> >> > > Table table = tableEnv.sqlQuery(sql);
> >> >> > > DataStream resultDS = tableEnv.toAppendStream(table,
> >> >> Row.class).map(xx);
> >> >> > >
> >> >> > > // Kafka producer parameter
> >> >> > > Properties producerProps = new Properties();
> >> >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> >> >> > > bootstrapServers);
> >> >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> >> >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> >> >> > kafkaBufferMemory);
> >> >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG,
> kafkaBatchSize);
> >> >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG,
> kafkaLingerMs);
> >> >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
> >> 30);
> >> >> > >
> >> producerProps.put(ProducerConfig.MA

Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-09 文章 Yu Li
Thanks Yun Tang for being our release manager and everyone else who made
the release possible!

Best Regards,
Yu


On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:

>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.13.2, which is the second bugfix release for the Apache Flink 1.13
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Yun Tang
>