Hi,
I'm using Flink 1.2.0 and try to do "exactly once" data transfer
from Kafka to Elasticsearch, but I cannot.
(Scala 2.11, Kafka 0.10, without YARN)

There are 2 Flink TaskManager nodes, and when processing
with 2 parallelism, shutdown one of them (simulating node failure).

Using flink-connector-kafka, I wrote following code:

   StreamExecutionEnvironment env = StreamExecutionEnvironment
         .getExecutionEnvironment();
   env.enableCheckpointing(1000L);
   env.setParallelism(2);

   Properties kafkaProp = new Properties();
   kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092");
   kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181");
   kafkaProp.setProperty("group.id", "id");

   DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
         "topic", new SimpleStringSchema(), kafkaProp));

I found duplicated data transfer on map function.
Data from the checkpoint before node failure seems duplicated.

Is there any way to achieve "exactly once" on failure?


Thanks.
Yuichiro

Reply via email to