Hi all, my flink job consume kafka topic A, and write to kafka topic B.
When i restart my flink job via savepoint, topic B have some duplicate
message. Any one can help me how to solve this problem? Thanks!

My Versions:
Flink 1.12.4
Kafka 2.0.1
Java 1.8

Core code:
env.enableCheckpointing(300000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);

tableEnv.createTemporaryView("data_table",dataDS);
String sql = "select * from data_table a inner join
hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
= b.id"
Table table = tableEnv.sqlQuery(sql);
DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);

// Kafka producer parameter
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"1");
producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new
JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
                .setParallelism(sinkParallelism);

Reply via email to