Yep, an option to ensure replays see identical input would be pretty useful.
It might be challenging on horizontally checkpointing runners like Flink
(only way I see to buffer all the input in state and replay it after
checkpoint).

On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <re...@google.com.invalid>
wrote:

> Please see Kenn's proposal. This is a generic thing that is lacking in the
> Beam model, and only works today for specific runners. We should fix this
> at the Beam level, but I don't think that should block your PR.
>
>
> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi <rang...@google.com.invalid>
> wrote:
>
> > There are quite a few customers using KafkaIO with Dataflow. All of them
> > are potential users of exactly-once sink. Dataflow Pubsub sink does not
> > support EOS yet. Even among those customers, I do expect fraction of
> > applications requiring EOS would be pretty small, that's why I don't
> think
> > extra shuffles are too expensive in overall cost yet.
> >
> > It is also not clear how Flink's 2-phase commit sink function could be
> used
> > in Beam's context. Beam could add some checkpoint semantics to state-API
> so
> > that all the runners could support in platform specific way.
> >
> > Took a look at Flink PR, commented on a few issues I see in comments
> there
> > : https://github.com/apache/flink/pull/4239. May be an extra shuffle or
> > storing all them messages in state can get over those.
> >
> > On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Yes, I think making this explicit would be good. Having a
> transformation
> > > that makes assumptions about how the runner implements certain things
> is
> > > not optimal. Also, I think that most people probably don't use Kafka
> with
> > > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
> > here).
> > > This would mean that the intersection of "people who would benefit from
> > an
> > > exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> rather
> > > small, and therefore not many people would benefit from such a
> Transform.
> > >
> > > This is all just conjecture, of course.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID>
> wrote:
> > > >
> > > > I think the issue we're hitting is how to write this in Beam.
> > > >
> > > > Dataflow historically guaranteed checkpointing at every GBK (which
> due
> > to
> > > > the design of Dataflow's streaming shuffle was reasonably efficient).
> > In
> > > > Beam we never formalized these semantics, leaving these syncs in a
> gray
> > > > area. I believe the Spark runner currently checkpoints the RDD on
> every
> > > > GBK, so these unwritten semantics currently work for Dataflow and for
> > > Spark.
> > > >
> > > > We need someway to express this operation in Beam, whether it be via
> an
> > > > explicit Checkpoint() operation or via marking DoFns as having side
> > > > effects, and having the runner automatically insert such a Checkpoint
> > in
> > > > front of them. In Flink, this operation can be implemented using what
> > > > Aljoscha posted.
> > > >
> > > > Reuven
> > > >
> > > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
> aljos...@apache.org>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
> for
> > > such
> > > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> > > that:
> > > >> [2]
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> [1] https://github.com/apache/flink/blob/
> > 62e99918a45b7215c099fbcf160d45
> > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > java#L55
> > > <
> > > >> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > java#L55>
> > > >> [2] https://github.com/apache/flink/pull/4239
> > > >>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID
> >
> > > >> wrote:
> > > >>>
> > > >>> Kafka 0.11 added support for transactions[1], which allows
> end-to-end
> > > >>> exactly-once semantics. Beam's KafkaIO users can benefit from these
> > > while
> > > >>> using runners that support exactly-once processing.
> > > >>>
> > > >>> I have an implementation of EOS support for Kafka sink :
> > > >>> https://github.com/apache/beam/pull/3612
> > > >>> It has two shuffles and builds on Beam state-API and checkpoint
> > barrier
> > > >>> between stages (as in Dataflow). Pull request has a longer
> > description.
> > > >>>
> > > >>> - What other runners in addition to Dataflow would be compatible
> with
> > > >> such
> > > >>> a strategy?
> > > >>> - I think it does not quite work for Flink (as it has a global
> > > >> checkpoint,
> > > >>> not between the stages). How would one go about implementing such a
> > > sink.
> > > >>>
> > > >>> Any comments on the pull request are also welcome.
> > > >>>
> > > >>> Thanks,
> > > >>> Raghu.
> > > >>>
> > > >>> [1]
> > > >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> > > >> possible-heres-how-apache-kafka-does-it/
> > > >>
> > > >>
> > >
> > >
> >
>

Reply via email to