Hi!

Exactly-once end-to-end requires sinks that support that kind of behavior
(typically some form of transactions support).

Kafka currently does not have the mechanisms in place to support
exactly-once sinks, but the Kafka project is working on that feature.
For ElasticSearch, it is also not simply possible (because of missing
transactions), but you can use Flink's state as the "authorative" state (it
is exactly once) and then write changes to Flink's state to Elastic. That
way the writes to ElasticSearch become "idempotent", which means duplicates
simple make no additional changes.

Hope that helps!

Stephan




On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto <phonypian...@gmail.com> wrote:

> Hi,
> I'm using Flink 1.2.0 and try to do "exactly once" data transfer
> from Kafka to Elasticsearch, but I cannot.
> (Scala 2.11, Kafka 0.10, without YARN)
>
> There are 2 Flink TaskManager nodes, and when processing
> with 2 parallelism, shutdown one of them (simulating node failure).
>
> Using flink-connector-kafka, I wrote following code:
>
>    StreamExecutionEnvironment env = StreamExecutionEnvironment
>          .getExecutionEnvironment();
>    env.enableCheckpointing(1000L);
>    env.setParallelism(2);
>
>    Properties kafkaProp = new Properties();
>    kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092");
>    kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181");
>    kafkaProp.setProperty("group.id", "id");
>
>    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
>          "topic", new SimpleStringSchema(), kafkaProp));
>
> I found duplicated data transfer on map function.
> Data from the checkpoint before node failure seems duplicated.
>
> Is there any way to achieve "exactly once" on failure?
>
>
> Thanks.
> Yuichiro
>

Reply via email to