我看flink sql写到kafka的时候,也没有开启事务。所以,这一点,我非常想不通
东东 <dongdongking...@163.com> 于2021年8月9日周一 下午5:56写道: > > > > 有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。 > > > 在 2021-08-09 17:44:57,"Jim Chen" <chenshuai19950...@gmail.com> 写道: > >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop > >with savepoint,结果没有重复。。。。 > > > >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why...... > > > >东东 <dongdongking...@163.com> 于2021年8月2日周一 下午7:13写道: > > > >> 从topic B实时写到hive,这个job需要配置 isolation.level 为 > >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 > >> > >> > >> 在 2021-08-02 19:00:13,"Jim Chen" <chenshuai19950...@gmail.com> 写道: > >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。 > >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic > >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 > >> > > >> >东东 <dongdongking...@163.com> 于2021年8月2日周一 下午6:20写道: > >> > > >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed > >> >> > >> >> > >> >> 在 2021-08-02 18:14:27,"Jim Chen" <chenshuai19950...@gmail.com> 写道: > >> >> >Hi 刘建刚, > >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 > >> >> >停止命令: > >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ > >> >> >-yid application_1625497885855_703064 \ > >> >> >-p > >> >> > >> >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> >\ > >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > >> >> > > >> >> >重启命令: > >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> >-m yarn-cluster \ > >> >> >-yjm 4096 -ytm 4096 \ > >> >> >-ynm User_Click_Log_Split_All \ > >> >> >-yqu syh_offline \ > >> >> >-ys 2 \ > >> >> >-d \ > >> >> >-p 64 \ > >> >> >-s > >> >> > >> >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 > >> >> >\ > >> >> >-n \ > >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> >> > >> >> > >> > >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> >> > > >> >> > > >> >> >刘建刚 <liujiangangp...@gmail.com> 于2021年8月2日周一 下午3:49写道: > >> >> > > >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with > >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 > >> >> >> > >> >> >> > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ > >> >> >> > >> >> >> Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:33写道: > >> >> >> > >> >> >> > 我是通过savepoint的方式重启的,命令如下: > >> >> >> > > >> >> >> > cancel command: > >> >> >> > > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > >> >> >> > -yid application_1625497885855_698371 \ > >> >> >> > -s > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> >> > \ > >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a > >> >> >> > > >> >> >> > print savepoint: > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> >> > > >> >> >> > > >> >> >> > restart command: > >> >> >> > > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> >> > -m yarn-cluster \ > >> >> >> > -yjm 4096 -ytm 4096 \ > >> >> >> > -ynm User_Click_Log_Split_All \ > >> >> >> > -yqu syh_offline \ > >> >> >> > -ys 2 \ > >> >> >> > -d \ > >> >> >> > -p 64 \ > >> >> >> > -s > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> >> > \ > >> >> >> > -n \ > >> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> >> >> > > >> >> >> > Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:01写道: > >> >> >> > > >> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > >> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > >> >> >> > > > >> >> >> > > My Versions > >> >> >> > > Flink 1.12.4 > >> >> >> > > Kafka 2.0.1 > >> >> >> > > Java 1.8 > >> >> >> > > > >> >> >> > > Core code: > >> >> >> > > > >> >> >> > > env.enableCheckpointing(300000); > >> >> >> > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> >> >> > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> >> >> > > > >> >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > >> >> >> > > > >> >> >> > > tableEnv.createTemporaryView("data_table",dataDS); > >> >> >> > > String sql = "select * from data_table a inner join > >> >> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime > as > >> b > >> >> on > >> >> >> > a.id > >> >> >> > > = b.id" > >> >> >> > > Table table = tableEnv.sqlQuery(sql); > >> >> >> > > DataStream resultDS = tableEnv.toAppendStream(table, > >> >> >> Row.class).map(xx); > >> >> >> > > > >> >> >> > > // Kafka producer parameter > >> >> >> > > Properties producerProps = new Properties(); > >> >> >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > >> >> >> > > bootstrapServers); > >> >> >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > >> >> >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, > >> >> >> > kafkaBufferMemory); > >> >> >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, > >> kafkaBatchSize); > >> >> >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, > >> kafkaLingerMs); > >> >> >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, > >> >> 300000); > >> >> >> > > > >> >> > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > >> >> >> > > "1"); > >> >> >> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); > >> >> >> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, > >> "true"); > >> >> >> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, > "lz4"); > >> >> >> > > > >> >> >> > > resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, > >> new > >> >> >> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), > >> >> >> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) > >> >> >> > > .setParallelism(sinkParallelism); > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> >