我看flink sql写到kafka的时候,也没有开启事务。所以,这一点,我非常想不通
东东 于2021年8月9日周一 下午5:56写道:
>
>
>
> 有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。
>
>
> 在 2021-08-09 17:44:57,"Jim Chen" 写道:
> >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
> >with savepoint,结果没有重复
> >
> >我
有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。
在 2021-08-09 17:44:57,"Jim Chen" 写道:
>有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
>with savepoint,结果没有重复
>
>我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..
>
>东东 于2021年8月2日周一 下午7:13写道:
>
>> 从topic
有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
with savepoint,结果没有重复
我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..
东东 于2021年8月2日周一 下午7:13写道:
> 从topic B实时写到hive,这个job需要配置 isolation.level 为
> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
>
>
> 在 2021-08
从topic B实时写到hive,这个job需要配置 isolation.level 为
read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
在 2021-08-02 19:00:13,"Jim Chen" 写道:
>我不太懂,下游的isolation.level是不是read_committed是啥意思。
>我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
>B实时写到hive上,然后在hive表中,根据partitionId和offse
我不太懂,下游的isolation.level是不是read_committed是啥意思。
我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
东东 于2021年8月2日周一 下午6:20写道:
> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
>
>
> 在 2021-08-02 18:14:27,"Jim Chen" 写道:
> >Hi
下游如何发现重复数据的,下游的isolation.level是不是read_committed
在 2021-08-02 18:14:27,"Jim Chen" 写道:
>Hi 刘建刚,
>我使用了stop with savepoint,但是还是发现,下游有重复数据。
>停止命令:
>/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
>-yid application_1625497885855_703064 \
>-p
>hdfs://ztcluster/flink_realtime_warehouse/checkpoi
Hi 刘建刚,
我使用了stop with savepoint,但是还是发现,下游有重复数据。
停止命令:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
-yid application_1625497885855_703064 \
-p
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
-d 55e7ebb6fa38faaba61b4b9a7cd89827
重启命令:
/home/datadev/flink-1
cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
Jim Chen 于2021年8月2日周一 下午2:33写道:
> 我是通过savepoint的方式重启的,命令如下:
>
> cancel command:
>
> /home/datadev/flink-1.12.
我是通过savepoint的方式重启的,命令如下:
cancel command:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
-yid application_1625497885855_698371 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
59cf6ccc83aa163bd1e0cd3304dfe06a
print savepoint:
hdfs://ztcluster/fli
大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B.
当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
My Versions
Flink 1.12.4
Kafka 2.0.1
Java 1.8
Core code:
env.enableCheckpointing(30);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
10 matches
Mail list logo