Hi,

exactly. You have to make sure that you can write data for the same ID
multiple times.
Exactly once in Flink is only guaranteed for registered state. So if you
have a flatMap() with a "counter" variable, that is held in a "ValueState",
this counter will always be in sync with the number of elements in the
kafka topic (because the counter is reset on a failure).

On Tue, Feb 21, 2017 at 4:04 PM, Y. Sakamoto <phonypian...@gmail.com> wrote:

> Thank you for your reply.
>
> Under my understanding, Map / Filter Function operate with "at least once"
> when a failure occurs, and it is necessary to code that it will be saved
> (overwritten) in Elasticsearch with the same ID even if double data comes.
> Is it correct?
> (sorry, I cannot understand how to "write changes to Flink's state to
> Elastic")
>
> Regards,
> Yuichiro
>
>
> On 2017/02/21 3:56, Stephan Ewen wrote:
>
>> Hi!
>>
>> Exactly-once end-to-end requires sinks that support that kind of behavior
>> (typically some form of transactions support).
>>
>> Kafka currently does not have the mechanisms in place to support
>> exactly-once sinks, but the Kafka project is working on that feature.
>> For ElasticSearch, it is also not simply possible (because of missing
>> transactions), but you can use Flink's state as the "authorative" state (it
>> is exactly once) and then write changes to Flink's state to Elastic. That
>> way the writes to ElasticSearch become "idempotent", which means duplicates
>> simple make no additional changes.
>>
>> Hope that helps!
>>
>> Stephan
>>
>>
>>
>>
>> On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto <phonypian...@gmail.com
>> <mailto:phonypian...@gmail.com>> wrote:
>>
>>     Hi,
>>     I'm using Flink 1.2.0 and try to do "exactly once" data transfer
>>     from Kafka to Elasticsearch, but I cannot.
>>     (Scala 2.11, Kafka 0.10, without YARN)
>>
>>     There are 2 Flink TaskManager nodes, and when processing
>>     with 2 parallelism, shutdown one of them (simulating node failure).
>>
>>     Using flink-connector-kafka, I wrote following code:
>>
>>        StreamExecutionEnvironment env = StreamExecutionEnvironment
>>              .getExecutionEnvironment();
>>        env.enableCheckpointing(1000L);
>>        env.setParallelism(2);
>>
>>        Properties kafkaProp = new Properties();
>>        kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092 <
>> http://192.168.97.42:9092>");
>>        kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181 <
>> http://192.168.97.42:2181>");
>>        kafkaProp.setProperty("group.id <http://group.id>", "id");
>>
>>        DataStream<String> stream = env.addSource(new
>> FlinkKafkaConsumer010<>(
>>              "topic", new SimpleStringSchema(), kafkaProp));
>>
>>     I found duplicated data transfer on map function.
>>     Data from the checkpoint before node failure seems duplicated.
>>
>>     Is there any way to achieve "exactly once" on failure?
>>
>>
>>     Thanks.
>>     Yuichiro
>>
>>
>>
>
> --
> ☆ ─────────────── ─ ─ - -
> Yuichiro SAKAMOTO
> ks...@muc.biglobe.ne.jp
> phonypian...@gmail.com
> http://phonypianist.sakura.ne.jp
>
>

Reply via email to