Yep, an option to ensure replays see identical input would be pretty useful. It might be challenging on horizontally checkpointing runners like Flink (only way I see to buffer all the input in state and replay it after checkpoint).
On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <re...@google.com.invalid> wrote: > Please see Kenn's proposal. This is a generic thing that is lacking in the > Beam model, and only works today for specific runners. We should fix this > at the Beam level, but I don't think that should block your PR. > > > On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi <rang...@google.com.invalid> > wrote: > > > There are quite a few customers using KafkaIO with Dataflow. All of them > > are potential users of exactly-once sink. Dataflow Pubsub sink does not > > support EOS yet. Even among those customers, I do expect fraction of > > applications requiring EOS would be pretty small, that's why I don't > think > > extra shuffles are too expensive in overall cost yet. > > > > It is also not clear how Flink's 2-phase commit sink function could be > used > > in Beam's context. Beam could add some checkpoint semantics to state-API > so > > that all the runners could support in platform specific way. > > > > Took a look at Flink PR, commented on a few issues I see in comments > there > > : https://github.com/apache/flink/pull/4239. May be an extra shuffle or > > storing all them messages in state can get over those. > > > > On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > Yes, I think making this explicit would be good. Having a > transformation > > > that makes assumptions about how the runner implements certain things > is > > > not optimal. Also, I think that most people probably don't use Kafka > with > > > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing > > here). > > > This would mean that the intersection of "people who would benefit from > > an > > > exactly-once Kafka sink" and "people who use Beam on Dataflow" is > rather > > > small, and therefore not many people would benefit from such a > Transform. > > > > > > This is all just conjecture, of course. > > > > > > Best, > > > Aljoscha > > > > > > > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID> > wrote: > > > > > > > > I think the issue we're hitting is how to write this in Beam. > > > > > > > > Dataflow historically guaranteed checkpointing at every GBK (which > due > > to > > > > the design of Dataflow's streaming shuffle was reasonably efficient). > > In > > > > Beam we never formalized these semantics, leaving these syncs in a > gray > > > > area. I believe the Spark runner currently checkpoints the RDD on > every > > > > GBK, so these unwritten semantics currently work for Dataflow and for > > > Spark. > > > > > > > > We need someway to express this operation in Beam, whether it be via > an > > > > explicit Checkpoint() operation or via marking DoFns as having side > > > > effects, and having the runner automatically insert such a Checkpoint > > in > > > > front of them. In Flink, this operation can be implemented using what > > > > Aljoscha posted. > > > > > > > > Reuven > > > > > > > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek < > aljos...@apache.org> > > > > wrote: > > > > > > > >> Hi, > > > >> > > > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used > for > > > such > > > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on > > > that: > > > >> [2] > > > >> > > > >> Best, > > > >> Aljoscha > > > >> > > > >> [1] https://github.com/apache/flink/blob/ > > 62e99918a45b7215c099fbcf160d45 > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. > > java#L55 > > > < > > > >> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45 > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. > > java#L55> > > > >> [2] https://github.com/apache/flink/pull/4239 > > > >>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID > > > > > >> wrote: > > > >>> > > > >>> Kafka 0.11 added support for transactions[1], which allows > end-to-end > > > >>> exactly-once semantics. Beam's KafkaIO users can benefit from these > > > while > > > >>> using runners that support exactly-once processing. > > > >>> > > > >>> I have an implementation of EOS support for Kafka sink : > > > >>> https://github.com/apache/beam/pull/3612 > > > >>> It has two shuffles and builds on Beam state-API and checkpoint > > barrier > > > >>> between stages (as in Dataflow). Pull request has a longer > > description. > > > >>> > > > >>> - What other runners in addition to Dataflow would be compatible > with > > > >> such > > > >>> a strategy? > > > >>> - I think it does not quite work for Flink (as it has a global > > > >> checkpoint, > > > >>> not between the stages). How would one go about implementing such a > > > sink. > > > >>> > > > >>> Any comments on the pull request are also welcome. > > > >>> > > > >>> Thanks, > > > >>> Raghu. > > > >>> > > > >>> [1] > > > >>> https://www.confluent.io/blog/exactly-once-semantics-are- > > > >> possible-heres-how-apache-kafka-does-it/ > > > >> > > > >> > > > > > > > > >