On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi <ang...@gmail.com> wrote:
> > > On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <re...@google.com> wrote: > >> RE: Kenn's suggestion. i think Raghu looked into something that, and >> something about it didn't work. I don't remember all the details, but I >> think there might have been some subtle problem with it that wasn't >> obvious. Doesn't mean that there isn't another way to solve that issue.' >> > > Two disadvantages: > - A transaction in Kafka are tied to single producer instance. There is no > official API to start a txn in one process and access it in another > process. Flink's sink uses an internal REST API for this. > Can you say more about how this works? - There is one failure case that I mentioned earlier: if closing the > transaction in downstream transform fails, it is data loss, there is no way > to replay the upstream transform that wrote the records to Kafka. > With coupling of unrelated failures due to fusion, this is a severe problem. I think I see now how 2PC affects this. From my reading, I can't see the difference in how Flink works. If the checkpoint finalization callback that does the Kafka commit fails, does it invalidate the checkpoint so the start transaction + write elements is retried? Kenn > > GBKs don't have major scalability limitations in most runner. Extra GBK is > fine in practice for such a sink (at least no one has complained about it > yet, though I don't know real usage numbers in practice). Flink's > implentation in Beam using @RequiresStableInput does have storage > requirements and latency costs that increase with checkpoint interval. I > think is still just as useful. Good to see @RequiresStableInput support > added to Flink runner in Max's PR. > > >> Hopefully we can make that work. Another possibility if we can't is to do >> something special for Flink. Beam allows runners to splice out well-known >> transforms with their own implementation. Dataflow already does that for >> Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the >> Kafka sink with one that uses Flink-specific functionality. Ideally this >> would reuse most of the existing Kafka code (maybe we could refactor just >> the EOS part into something that could be subbed out). >> >> Reuven >> >> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels <m...@apache.org> wrote: >> >>> > It would be interesting to see if there's something we could add to >>> the Beam model that would create a better story for Kafka's EOS writes. >>> >>> There would have to be a checkpoint-completed callback the DoFn can >>> register with the Runner. Does not seem applicable for most Runners >>> though. >>> >>> > This is true, however isn't it already true for such uses of Flink? >>> >>> Yes, that's correct. In the case of Kafka, Flink can offload the >>> buffering but for the general case, idempotent writes are only possible >>> if we buffer data until the checkpoint is completed. >>> >>> On 04.03.19 17:45, Reuven Lax wrote: >>> > >>> > >>> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <m...@apache.org >>> > <mailto:m...@apache.org>> wrote: >>> > >>> > > Can we do 2? I seem to remember that we had trouble in some >>> cases >>> > (e..g in the BigQuery case, there was no obvious way to create a >>> > deterministic id, which is why we went for a random number followed >>> > by a reshuffle). Also remember that the user ParDo that is >>> producing >>> > data to the sink is not guaranteed to be deterministic; the Beam >>> > model allows for non-deterministic transforms. >>> > >>> > I believe we could use something like the worker id to make it >>> > deterministic, though the worker id can change after a restart. We >>> > could >>> > persist it in Flink's operator state. I do not know if we can come >>> up >>> > with a Runner-independent solution. >>> > >>> > >>> > If we did this, we would break it on runners that don't have a concept >>> > of a stable worker id :( The Dataflow runner can load balance work at >>> > any time (including moving work around between workers). >>> > >>> > >>> > > I'm not quite sure I understand. If a ParDo is marked with >>> > RequiresStableInput, can't the flink runner buffer the input >>> message >>> > until after the checkpoint is complete and only then deliver it to >>> > the ParDo? >>> > >>> > You're correct. I thought that it could suffice to only buffer >>> during a >>> > checkpoint and otherwise rely on the deterministic execution of the >>> > pipeline and KafkaIO's de-duplication code. >>> > >>> > >>> > Yes, I want to distinguish the KafkaIO case from the general case. It >>> > would be interesting to see if there's something we could add to the >>> > Beam model that would create a better story for Kafka's EOS writes. >>> > >>> > >>> > In any case, emitting only after finalization of checkpoints gives >>> us >>> > guaranteed stable input. It also means that the processing is >>> tight to >>> > the checkpoint interval, the checkpoint duration, and the available >>> > memory. >>> > >>> > >>> > This is true, however isn't it already true for such uses of Flink? >>> > >>> > >>> > On 01.03.19 19:41, Reuven Lax wrote: >>> > > >>> > > >>> > > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels >>> > <m...@apache.org <mailto:m...@apache.org> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: >>> > > >>> > > Fully agree. I think we can improve the situation >>> > drastically. For >>> > > KafkaIO EOS with Flink we need to make these two changes: >>> > > >>> > > 1) Introduce buffering while the checkpoint is being taken >>> > > 2) Replace the random shard id assignment with something >>> > deterministic >>> > > >>> > > >>> > > Can we do 2? I seem to remember that we had trouble in some >>> cases >>> > (e..g >>> > > in the BigQuery case, there was no obvious way to create a >>> > deterministic >>> > > id, which is why we went for a random number followed by a >>> > reshuffle). >>> > > Also remember that the user ParDo that is producing data to the >>> > sink is >>> > > not guaranteed to be deterministic; the Beam model allows for >>> > > non-deterministic transforms. >>> > > >>> > > >>> > > However, we won't be able to provide full compatibility with >>> > > RequiresStableInput because Flink only guarantees stable >>> > input after a >>> > > checkpoint. RequiresStableInput requires input at any point >>> > in time to >>> > > be stable. >>> > > >>> > > >>> > > I'm not quite sure I understand. If a ParDo is marked with >>> > > RequiresStableInput, can't the flink runner buffer the input >>> message >>> > > until after the checkpoint is complete and only then deliver it >>> > to the >>> > > ParDo? This adds latency of course, but I'm not sure how else >>> to do >>> > > things correctly with the Beam model. >>> > > >>> > > IMHO the only way to achieve that is materializing output >>> > > which Flink does not currently support. >>> > > >>> > > KafkaIO does not need all the power of RequiresStableInput >>> to >>> > achieve >>> > > EOS with Flink, but for the general case I don't see a good >>> > solution at >>> > > the moment. >>> > > >>> > > -Max >>> > > >>> > > On 01.03.19 16:45, Reuven Lax wrote: >>> > > > Yeah, the person who was working on it originally stopped >>> > working on >>> > > > Beam, and nobody else ever finished it. I think it is >>> > important to >>> > > > finish though. Many of the existing Sinks are only fully >>> > correct for >>> > > > Dataflow today, because they generate either Reshuffle or >>> > > GroupByKey to >>> > > > ensure input stability before outputting (in many cases >>> > this code >>> > > was >>> > > > inherited from before Beam existed). On Flink today, >>> these >>> > sinks >>> > > might >>> > > > occasionally produce duplicate output in the case of >>> failures. >>> > > > >>> > > > Reuven >>> > > > >>> > > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels >>> > <m...@apache.org <mailto:m...@apache.org> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org>> >>> > > > <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: >>> > > > >>> > > > Circling back to the RequiresStableInput >>> > annotation[1]. I've >>> > > done some >>> > > > protoyping to see how this could be integrated into >>> > Flink. I'm >>> > > > currently >>> > > > writing a test based on RequiresStableInput. >>> > > > >>> > > > I found out there are already checks in place at the >>> > Runners to >>> > > > throw in >>> > > > case transforms use RequiresStableInput and its not >>> > > supported. However, >>> > > > not a single transform actually uses the annotation. >>> > > > >>> > > > It seems that the effort stopped at some point? Would >>> > it make >>> > > sense to >>> > > > start annotating KafkaExactlyOnceSink with >>> > > @RequiresStableInput? We >>> > > > could then get rid of the whitelist. >>> > > > >>> > > > -Max >>> > > > >>> > > > [1] >>> > > > >>> > > >>> > >>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>> > > > >>> > > > >>> > > > >>> > > > On 01.03.19 14:28, Maximilian Michels wrote: >>> > > > > Just realized that transactions do not spawn >>> multiple >>> > > elements in >>> > > > > KafkaExactlyOnceSink. So the proposed solution to >>> stop >>> > > processing >>> > > > > elements while a snapshot is pending would work. >>> > > > > >>> > > > > It is certainly not optimal in terms of >>> performance for >>> > > Flink and >>> > > > poses >>> > > > > problems when checkpoints take long to complete, >>> but it >>> > > would be >>> > > > > worthwhile to implement this to make use of the >>> EOS >>> > feature. >>> > > > > >>> > > > > Thanks, >>> > > > > Max >>> > > > > >>> > > > > On 01.03.19 12:23, Maximilian Michels wrote: >>> > > > >> Thanks you for the prompt replies. It's great to >>> > see that >>> > > there is >>> > > > >> good understanding of how EOS in Flink works. >>> > > > >> >>> > > > >>> This is exactly what RequiresStableInput is >>> > supposed to >>> > > do. On the >>> > > > >>> Flink runner, this would be implemented by >>> delaying >>> > > processing >>> > > > until >>> > > > >>> the current checkpoint is done. >>> > > > >> >>> > > > >> I don't think that works because we have no >>> > control over >>> > > the Kafka >>> > > > >> transactions. Imagine: >>> > > > >> >>> > > > >> 1) ExactlyOnceWriter writes records to Kafka and >>> > commits, >>> > > then >>> > > > starts >>> > > > >> a new transaction. >>> > > > >> 2) Flink checkpoints, delaying the processing of >>> > > elements, the >>> > > > >> checkpoint fails. >>> > > > >> 3) We restore from an old checkpoint and will >>> > start writing >>> > > > duplicate >>> > > > >> data to Kafka. The de-duplication that the sink >>> > performs >>> > > does not >>> > > > >> help, especially because the random shards ids >>> > might be >>> > > assigned >>> > > > >> differently. >>> > > > >> >>> > > > >> IMHO we have to have control over commit to be >>> able to >>> > > provide EOS. >>> > > > >> >>> > > > >>> When we discussed this in Aug 2017, the >>> understanding >>> > > was that 2 >>> > > > >>> Phase commit utility in Flink used to implement >>> > Flink's >>> > > Kafka EOS >>> > > > >>> could not be implemented in Beam's context. >>> > > > >> >>> > > > >> That's also my understanding, unless we change >>> the >>> > interface. >>> > > > >> >>> > > > >>> I don't see how SDF solves this problem.. >>> > > > >> >>> > > > >> SDF has a checkpoint method which the Runner can >>> call, >>> > > but I think >>> > > > >> that you are right, that the above problem would >>> > be the same. >>> > > > >> >>> > > > >>> Absolutely. I would love to support EOS in >>> KakaIO for >>> > > Flink. I >>> > > > think >>> > > > >>> that will help many future exactly-once sinks.. >>> > and address >>> > > > >>> fundamental incompatibility between Beam model >>> > and Flink's >>> > > > horizontal >>> > > > >>> checkpointing for such applications. >>> > > > >> >>> > > > >> Great :) >>> > > > >> >>> > > > >>> The FlinkRunner would need to insert the "wait >>> until >>> > > checkpoint >>> > > > >>> finalization" logic wherever it sees >>> > @RequiresStableInput, >>> > > > which is >>> > > > >>> already what it would have to do. >>> > > > >> >>> > > > >> I don't think that fixes the problem. See above >>> > example. >>> > > > >> >>> > > > >> Thanks, >>> > > > >> Max >>> > > > >> >>> > > > >> On 01.03.19 00:04, Raghu Angadi wrote: >>> > > > >>> >>> > > > >>> >>> > > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi >>> > > <ang...@gmail.com <mailto:ang...@gmail.com> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>> > > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >>> > > > >>> <mailto:ang...@gmail.com >>> > <mailto:ang...@gmail.com> <mailto:ang...@gmail.com >>> > <mailto:ang...@gmail.com>> >>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote: >>> > > > >>> >>> > > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth >>> Knowles >>> > > > <k...@apache.org <mailto:k...@apache.org> >>> > <mailto:k...@apache.org <mailto:k...@apache.org>> >>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>> > <mailto:k...@apache.org <mailto:k...@apache.org>>> >>> > > > >>> <mailto:k...@apache.org >>> > <mailto:k...@apache.org> <mailto:k...@apache.org >>> > <mailto:k...@apache.org>> >>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>> > <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote: >>> > > > >>> >>> > > > >>> I'm not sure what a hard fail is. I >>> probably >>> > > have a shallow >>> > > > >>> understanding, but doesn't >>> > @RequiresStableInput work >>> > > > for 2PC? >>> > > > >>> The preCommit() phase should establish >>> the >>> > > transaction and >>> > > > >>> commit() is not called until after >>> checkpoint >>> > > > finalization. Can >>> > > > >>> you describe the way that it does not >>> work a >>> > > little bit >>> > > > more? >>> > > > >>> >>> > > > >>> >>> > > > >>> - preCommit() is called before checkpoint. >>> > Kafka EOS in >>> > > > Flink starts >>> > > > >>> the transaction before this and makes sure >>> it >>> > > flushes all >>> > > > records in >>> > > > >>> preCommit(). So far good. >>> > > > >>> - commit is called after checkpoint is >>> persisted. >>> > > Now, imagine >>> > > > >>> commit() fails for some reason. There is no >>> > option >>> > > to rerun >>> > > > the 1st >>> > > > >>> phase to write the records again in a new >>> > > transaction. This >>> > > > is a >>> > > > >>> hard failure for the the job. In practice >>> > Flink might >>> > > > attempt to >>> > > > >>> commit again (not sure how many times), >>> which is >>> > > likely to >>> > > > fail and >>> > > > >>> eventually results in job failure. >>> > > > >>> >>> > > > >>> >>> > > > >>> In Apache Beam, the records could be stored in >>> state, >>> > > and can be >>> > > > >>> written inside commit() to work around this >>> issue. It >>> > > could have >>> > > > >>> scalability issues if checkpoints are not >>> frequent >>> > > enough in Flink >>> > > > >>> runner. >>> > > > >>> >>> > > > >>> Raghu. >>> > > > >>> >>> > > > >>> >>> > > > >>> >>> > > > >>> Kenn >>> > > > >>> >>> > > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu >>> Angadi >>> > > > <ang...@gmail.com <mailto:ang...@gmail.com> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >>> > > > >>> <mailto:ang...@gmail.com >>> > <mailto:ang...@gmail.com> >>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> >>> wrote: >>> > > > >>> >>> > > > >>> On Thu, Feb 28, 2019 at 11:01 AM >>> > Kenneth Knowles >>> > > > >>> <k...@apache.org >>> > <mailto:k...@apache.org> <mailto:k...@apache.org >>> > <mailto:k...@apache.org>> >>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>> > <mailto:k...@apache.org <mailto:k...@apache.org>>> >>> > > > <mailto:k...@apache.org <mailto:k...@apache.org> >>> > <mailto:k...@apache.org <mailto:k...@apache.org>> >>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>> > <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote: >>> > > > >>> >>> > > > >>> I believe the way you would >>> implement >>> > > the logic >>> > > > behind >>> > > > >>> Flink's KafkaProducer would be >>> to >>> > have >>> > > two steps: >>> > > > >>> >>> > > > >>> 1. Start transaction >>> > > > >>> 2. @RequiresStableInput Close >>> > transaction >>> > > > >>> >>> > > > >>> >>> > > > >>> I see. What happens if closing the >>> > transaction >>> > > > fails in >>> > > > >>> (2)? Flink's 2PC requires that >>> > commit() should >>> > > > never hard >>> > > > >>> fail once preCommit() succeeds. I >>> > think that is >>> > > > cost of not >>> > > > >>> having an extra shuffle. It is >>> > alright since >>> > > this >>> > > > policy has >>> > > > >>> worked well for Flink so far. >>> > > > >>> >>> > > > >>> Overall, it will be great to have >>> > > @RequiresStableInput >>> > > > >>> support in Flink runner. >>> > > > >>> >>> > > > >>> Raghu. >>> > > > >>> >>> > > > >>> The FlinkRunner would need to >>> > insert the >>> > > "wait >>> > > > until >>> > > > >>> checkpoint finalization" logic >>> > wherever it >>> > > > >>> sees @RequiresStableInput, >>> which is >>> > > already what it >>> > > > >>> would have to do. >>> > > > >>> >>> > > > >>> This matches the KafkaProducer's >>> > logic - >>> > > delay >>> > > > closing >>> > > > >>> the transaction until checkpoint >>> > > finalization. This >>> > > > >>> answers my main question, which >>> > is "is >>> > > > >>> @RequiresStableInput expressive >>> > enough >>> > > to allow >>> > > > >>> Beam-on-Flink to have exactly >>> > once behavior >>> > > > with the >>> > > > >>> same performance >>> characteristics as >>> > > native Flink >>> > > > >>> checkpoint finalization?" >>> > > > >>> >>> > > > >>> Kenn >>> > > > >>> >>> > > > >>> [1] >>> > https://github.com/apache/beam/pull/7955 >>> > > > >>> >>> > > > >>> On Thu, Feb 28, 2019 at 10:43 AM >>> > Reuven Lax >>> > > > >>> <re...@google.com >>> > <mailto:re...@google.com> >>> > > <mailto:re...@google.com <mailto:re...@google.com>> >>> > <mailto:re...@google.com <mailto:re...@google.com> >>> > > <mailto:re...@google.com <mailto:re...@google.com>>> >>> > > > <mailto:re...@google.com <mailto:re...@google.com> >>> > <mailto:re...@google.com <mailto:re...@google.com>> >>> > > <mailto:re...@google.com <mailto:re...@google.com> >>> > <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote: >>> > > > >>> >>> > > > >>> >>> > > > >>> >>> > > > >>> On Thu, Feb 28, 2019 at >>> 10:41 AM >>> > > Raghu Angadi >>> > > > >>> <ang...@gmail.com >>> > <mailto:ang...@gmail.com> >>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >>> > > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote: >>> > > > >>> >>> > > > >>> >>> > > > >>> Now why does the >>> Flink >>> > > Runner not >>> > > > support >>> > > > >>> KafkaIO EOS? Flink's >>> > native >>> > > > >>> KafkaProducer >>> supports >>> > > exactly-once. It >>> > > > >>> simply commits the >>> > pending >>> > > > >>> transaction once it >>> has >>> > > completed a >>> > > > >>> checkpoint. >>> > > > >>> >>> > > > >>> >>> > > > >>> >>> > > > >>> On Thu, Feb 28, 2019 at >>> > 9:59 AM >>> > > Maximilian >>> > > > >>> Michels <m...@apache.org >>> > <mailto:m...@apache.org> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org>> >>> > > > <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>>> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>>>>> >>> > > > >>> wrote: >>> > > > >>> >>> > > > >>> Hi, >>> > > > >>> >>> > > > >>> I came across >>> > KafkaIO's Runner >>> > > > whitelist [1] >>> > > > >>> for enabling >>> exactly-once >>> > > > >>> semantics (EOS). I >>> > think it is >>> > > > questionable >>> > > > >>> to exclude Runners >>> from >>> > > > >>> inside a transform, >>> but I >>> > > see that the >>> > > > >>> intention was to >>> save >>> > users from >>> > > > >>> surprises. >>> > > > >>> >>> > > > >>> Now why does the >>> Flink >>> > > Runner not >>> > > > support >>> > > > >>> KafkaIO EOS? Flink's >>> > native >>> > > > >>> KafkaProducer >>> supports >>> > > exactly-once. It >>> > > > >>> simply commits the >>> > pending >>> > > > >>> transaction once it >>> has >>> > > completed a >>> > > > >>> checkpoint. >>> > > > >>> >>> > > > >>> >>> > > > >>> >>> > > > >>> When we discussed this >>> in Aug >>> > > 2017, the >>> > > > >>> understanding was that 2 >>> > Phase >>> > > commit >>> > > > utility in >>> > > > >>> Flink used to implement >>> > Flink's >>> > > Kafka >>> > > > EOS could >>> > > > >>> not be implemented in >>> Beam's >>> > > context. >>> > > > >>> See this message >>> > > > >>> >>> > > >>> > <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html >>> > in >>> > > > >>> that dev thread. Has >>> anything >>> > > changed >>> > > > in this >>> > > > >>> regard? The whole >>> thread is >>> > > relevant to >>> > > > this >>> > > > >>> topic and worth going >>> > through. >>> > > > >>> >>> > > > >>> I think that TwoPhaseCommit >>> > utility >>> > > class >>> > > > wouldn't >>> > > > >>> work. The Flink runner would >>> > > probably want to >>> > > > >>> directly use >>> > notifySnapshotComplete >>> > > in order to >>> > > > >>> >>> implement @RequiresStableInput. >>> > > > >>> >>> > > > >>> >>> > > > >>> A checkpoint is >>> > realized by >>> > > sending >>> > > > barriers >>> > > > >>> through all channels >>> > > > >>> starting from the >>> > source until >>> > > > reaching all >>> > > > >>> sinks. Every >>> operator >>> > > > >>> persists its state >>> > once it has >>> > > > received a >>> > > > >>> barrier on all its >>> input >>> > > > >>> channels, it then >>> > forwards >>> > > it to the >>> > > > >>> downstream >>> operators. >>> > > > >>> >>> > > > >>> The architecture of >>> > Beam's >>> > > > >>> KafkaExactlyOnceSink >>> > is as >>> > > follows[2]: >>> > > > >>> >>> > > > >>> Input -> >>> > AssignRandomShardIds -> >>> > > > GroupByKey >>> > > > >>> -> >>> AssignSequenceIds -> >>> > > > >>> GroupByKey -> >>> > ExactlyOnceWriter >>> > > > >>> >>> > > > >>> As I understood, >>> Spark or >>> > > Dataflow >>> > > > use the >>> > > > >>> GroupByKey stages to >>> > persist >>> > > > >>> the input. That is >>> not >>> > > required in >>> > > > Flink to >>> > > > >>> be able to take a >>> > consistent >>> > > > >>> snapshot of the >>> pipeline. >>> > > > >>> >>> > > > >>> Basically, for >>> Flink we >>> > > don't need >>> > > > any of >>> > > > >>> that magic that >>> > KafkaIO does. >>> > > > >>> What we would need >>> to >>> > > support EOS >>> > > > is a way >>> > > > >>> to tell the >>> > ExactlyOnceWriter >>> > > > >>> (a DoFn) to commit >>> once a >>> > > > checkpoint has >>> > > > >>> completed. >>> > > > >>> >>> > > > >>> I know that the new >>> > version >>> > > of SDF >>> > > > supports >>> > > > >>> checkpointing which >>> > should >>> > > > >>> solve this issue. >>> But >>> > there is >>> > > > still a lot >>> > > > >>> of work to do to >>> make >>> > this >>> > > > >>> reality. >>> > > > >>> >>> > > > >>> >>> > > > >>> I don't see how SDF >>> > solves this >>> > > > problem.. May be >>> > > > >>> pseudo code would make >>> more >>> > > clear. But if >>> > > > >>> helps, that is great! >>> > > > >>> >>> > > > >>> So I think it would >>> make >>> > > sense to think >>> > > > >>> about a way to make >>> > KafkaIO's >>> > > > >>> EOS more accessible >>> > to Runners >>> > > > which support >>> > > > >>> a different way of >>> > > > >>> checkpointing. >>> > > > >>> >>> > > > >>> >>> > > > >>> Absolutely. I would >>> love to >>> > > support EOS in >>> > > > >>> KakaIO for Flink. I >>> think >>> > that will >>> > > > help many >>> > > > >>> future exactly-once >>> > sinks.. and >>> > > address >>> > > > >>> fundamental >>> > incompatibility between >>> > > > Beam model >>> > > > >>> and Flink's horizontal >>> > checkpointing >>> > > > for such >>> > > > >>> applications. >>> > > > >>> >>> > > > >>> Raghu. >>> > > > >>> >>> > > > >>> Cheers, >>> > > > >>> Max >>> > > > >>> >>> > > > >>> PS: I found this >>> > document about >>> > > > >>> RequiresStableInput >>> > [3], but >>> > > IMHO >>> > > > >>> defining an >>> > annotation only >>> > > > manifests the >>> > > > >>> conceptual >>> difference >>> > between >>> > > > >>> the Runners. >>> > > > >>> >>> > > > >>> >>> > > > >>> [1] >>> > > > >>> >>> > > > >>> > > >>> > >>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >>> > > > >>> > > > >>> >>> > > > >>> [2] >>> > > > >>> >>> > > > >>> > > >>> > >>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >>> > > > >>> > > > >>> >>> > > > >>> [3] >>> > > > >>> >>> > > > >>> > > >>> > >>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>> > > > >>> > > > >>> >>> > > > >>> >>> > > > >>> > > >>> > >>> >>