@Raghu: Yes, exactly, that's what I thought about this morning, actually. These are the methods of an operator that are relevant to checkpointing:
class FlinkOperator() { open(); snapshotState(): notifySnapshotComplete(); initializeState(); } Input would be buffered in state, would be checkpointed in snapshotState() and processed when we receive a notification of a complete checkpoint (which is sent out once all operators have signaled that checkpointing is complete). In case of failure, we would be re-initialized with the buffered elements in initializeState() and could re-process them in open(). This is somewhat expensive and leads to higher latency so we should only do it if the DoFn signals that it needs deterministic input. +Jingsong Who is working on something similar for the output produced in finishBundle(). > On 9. Aug 2017, at 19:41, Raghu Angadi <rang...@google.com.INVALID> wrote: > > Yep, an option to ensure replays see identical input would be pretty useful. > It might be challenging on horizontally checkpointing runners like Flink > (only way I see to buffer all the input in state and replay it after > checkpoint). > > On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <re...@google.com.invalid> > wrote: > >> Please see Kenn's proposal. This is a generic thing that is lacking in the >> Beam model, and only works today for specific runners. We should fix this >> at the Beam level, but I don't think that should block your PR. >> >> >> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi <rang...@google.com.invalid> >> wrote: >> >>> There are quite a few customers using KafkaIO with Dataflow. All of them >>> are potential users of exactly-once sink. Dataflow Pubsub sink does not >>> support EOS yet. Even among those customers, I do expect fraction of >>> applications requiring EOS would be pretty small, that's why I don't >> think >>> extra shuffles are too expensive in overall cost yet. >>> >>> It is also not clear how Flink's 2-phase commit sink function could be >> used >>> in Beam's context. Beam could add some checkpoint semantics to state-API >> so >>> that all the runners could support in platform specific way. >>> >>> Took a look at Flink PR, commented on a few issues I see in comments >> there >>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle or >>> storing all them messages in state can get over those. >>> >>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Yes, I think making this explicit would be good. Having a >> transformation >>>> that makes assumptions about how the runner implements certain things >> is >>>> not optimal. Also, I think that most people probably don't use Kafka >> with >>>> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing >>> here). >>>> This would mean that the intersection of "people who would benefit from >>> an >>>> exactly-once Kafka sink" and "people who use Beam on Dataflow" is >> rather >>>> small, and therefore not many people would benefit from such a >> Transform. >>>> >>>> This is all just conjecture, of course. >>>> >>>> Best, >>>> Aljoscha >>>> >>>>> On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID> >> wrote: >>>>> >>>>> I think the issue we're hitting is how to write this in Beam. >>>>> >>>>> Dataflow historically guaranteed checkpointing at every GBK (which >> due >>> to >>>>> the design of Dataflow's streaming shuffle was reasonably efficient). >>> In >>>>> Beam we never formalized these semantics, leaving these syncs in a >> gray >>>>> area. I believe the Spark runner currently checkpoints the RDD on >> every >>>>> GBK, so these unwritten semantics currently work for Dataflow and for >>>> Spark. >>>>> >>>>> We need someway to express this operation in Beam, whether it be via >> an >>>>> explicit Checkpoint() operation or via marking DoFns as having side >>>>> effects, and having the runner automatically insert such a Checkpoint >>> in >>>>> front of them. In Flink, this operation can be implemented using what >>>>> Aljoscha posted. >>>>> >>>>> Reuven >>>>> >>>>> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek < >> aljos...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used >> for >>>> such >>>>>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on >>>> that: >>>>>> [2] >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> [1] https://github.com/apache/flink/blob/ >>> 62e99918a45b7215c099fbcf160d45 >>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ >>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. >>> java#L55 >>>> < >>>>>> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45 >>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ >>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. >>> java#L55> >>>>>> [2] https://github.com/apache/flink/pull/4239 >>>>>>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID >>> >>>>>> wrote: >>>>>>> >>>>>>> Kafka 0.11 added support for transactions[1], which allows >> end-to-end >>>>>>> exactly-once semantics. Beam's KafkaIO users can benefit from these >>>> while >>>>>>> using runners that support exactly-once processing. >>>>>>> >>>>>>> I have an implementation of EOS support for Kafka sink : >>>>>>> https://github.com/apache/beam/pull/3612 >>>>>>> It has two shuffles and builds on Beam state-API and checkpoint >>> barrier >>>>>>> between stages (as in Dataflow). Pull request has a longer >>> description. >>>>>>> >>>>>>> - What other runners in addition to Dataflow would be compatible >> with >>>>>> such >>>>>>> a strategy? >>>>>>> - I think it does not quite work for Flink (as it has a global >>>>>> checkpoint, >>>>>>> not between the stages). How would one go about implementing such a >>>> sink. >>>>>>> >>>>>>> Any comments on the pull request are also welcome. >>>>>>> >>>>>>> Thanks, >>>>>>> Raghu. >>>>>>> >>>>>>> [1] >>>>>>> https://www.confluent.io/blog/exactly-once-semantics-are- >>>>>> possible-heres-how-apache-kafka-does-it/ >>>>>> >>>>>> >>>> >>>> >>> >>