@Raghu: Yes, exactly, that's what I thought about this morning, actually. These 
are the methods of an operator that are relevant to checkpointing:

class FlinkOperator() {
  open();
  snapshotState():
  notifySnapshotComplete();
  initializeState();
}

Input would be buffered in state, would be checkpointed in snapshotState() and 
processed when we receive a notification of a complete checkpoint (which is 
sent out once all operators have signaled that checkpointing is complete). In 
case of failure, we would be re-initialized with the buffered elements in 
initializeState() and could re-process them in open().

This is somewhat expensive and leads to higher latency so we should only do it 
if the DoFn signals that it needs deterministic input.

+Jingsong Who is working on something similar for the output produced in 
finishBundle().

> On 9. Aug 2017, at 19:41, Raghu Angadi <rang...@google.com.INVALID> wrote:
> 
> Yep, an option to ensure replays see identical input would be pretty useful.
> It might be challenging on horizontally checkpointing runners like Flink
> (only way I see to buffer all the input in state and replay it after
> checkpoint).
> 
> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <re...@google.com.invalid>
> wrote:
> 
>> Please see Kenn's proposal. This is a generic thing that is lacking in the
>> Beam model, and only works today for specific runners. We should fix this
>> at the Beam level, but I don't think that should block your PR.
>> 
>> 
>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi <rang...@google.com.invalid>
>> wrote:
>> 
>>> There are quite a few customers using KafkaIO with Dataflow. All of them
>>> are potential users of exactly-once sink. Dataflow Pubsub sink does not
>>> support EOS yet. Even among those customers, I do expect fraction of
>>> applications requiring EOS would be pretty small, that's why I don't
>> think
>>> extra shuffles are too expensive in overall cost yet.
>>> 
>>> It is also not clear how Flink's 2-phase commit sink function could be
>> used
>>> in Beam's context. Beam could add some checkpoint semantics to state-API
>> so
>>> that all the runners could support in platform specific way.
>>> 
>>> Took a look at Flink PR, commented on a few issues I see in comments
>> there
>>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle or
>>> storing all them messages in state can get over those.
>>> 
>>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>> 
>>>> Yes, I think making this explicit would be good. Having a
>> transformation
>>>> that makes assumptions about how the runner implements certain things
>> is
>>>> not optimal. Also, I think that most people probably don't use Kafka
>> with
>>>> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
>>> here).
>>>> This would mean that the intersection of "people who would benefit from
>>> an
>>>> exactly-once Kafka sink" and "people who use Beam on Dataflow" is
>> rather
>>>> small, and therefore not many people would benefit from such a
>> Transform.
>>>> 
>>>> This is all just conjecture, of course.
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID>
>> wrote:
>>>>> 
>>>>> I think the issue we're hitting is how to write this in Beam.
>>>>> 
>>>>> Dataflow historically guaranteed checkpointing at every GBK (which
>> due
>>> to
>>>>> the design of Dataflow's streaming shuffle was reasonably efficient).
>>> In
>>>>> Beam we never formalized these semantics, leaving these syncs in a
>> gray
>>>>> area. I believe the Spark runner currently checkpoints the RDD on
>> every
>>>>> GBK, so these unwritten semantics currently work for Dataflow and for
>>>> Spark.
>>>>> 
>>>>> We need someway to express this operation in Beam, whether it be via
>> an
>>>>> explicit Checkpoint() operation or via marking DoFns as having side
>>>>> effects, and having the runner automatically insert such a Checkpoint
>>> in
>>>>> front of them. In Flink, this operation can be implemented using what
>>>>> Aljoscha posted.
>>>>> 
>>>>> Reuven
>>>>> 
>>>>> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
>> aljos...@apache.org>
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
>> for
>>>> such
>>>>>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
>>>> that:
>>>>>> [2]
>>>>>> 
>>>>>> Best,
>>>>>> Aljoscha
>>>>>> 
>>>>>> [1] https://github.com/apache/flink/blob/
>>> 62e99918a45b7215c099fbcf160d45
>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
>>> java#L55
>>>> <
>>>>>> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
>>> java#L55>
>>>>>> [2] https://github.com/apache/flink/pull/4239
>>>>>>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID
>>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>> Kafka 0.11 added support for transactions[1], which allows
>> end-to-end
>>>>>>> exactly-once semantics. Beam's KafkaIO users can benefit from these
>>>> while
>>>>>>> using runners that support exactly-once processing.
>>>>>>> 
>>>>>>> I have an implementation of EOS support for Kafka sink :
>>>>>>> https://github.com/apache/beam/pull/3612
>>>>>>> It has two shuffles and builds on Beam state-API and checkpoint
>>> barrier
>>>>>>> between stages (as in Dataflow). Pull request has a longer
>>> description.
>>>>>>> 
>>>>>>> - What other runners in addition to Dataflow would be compatible
>> with
>>>>>> such
>>>>>>> a strategy?
>>>>>>> - I think it does not quite work for Flink (as it has a global
>>>>>> checkpoint,
>>>>>>> not between the stages). How would one go about implementing such a
>>>> sink.
>>>>>>> 
>>>>>>> Any comments on the pull request are also welcome.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Raghu.
>>>>>>> 
>>>>>>> [1]
>>>>>>> https://www.confluent.io/blog/exactly-once-semantics-are-
>>>>>> possible-heres-how-apache-kafka-does-it/
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>> 
>> 

Reply via email to