Hi all, my flink job consume kafka topic A, and write to kafka topic B. When i restart my flink job via savepoint, topic B have some duplicate message. Any one can help me how to solve this problem? Thanks!
My Versions: Flink 1.12.4 Kafka 2.0.1 Java 1.8 Core code: env.enableCheckpointing(300000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); tableEnv.createTemporaryView("data_table",dataDS); String sql = "select * from data_table a inner join hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id = b.id" Table table = tableEnv.sqlQuery(sql); DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx); // Kafka producer parameter Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory); producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) .setParallelism(sinkParallelism);