Yea, exactly. On Wed, Aug 9, 2017 at 9:40 AM, Reuven Lax <re...@google.com.invalid> wrote:
> Oh, I understand now. This DoFn is saying "make my input deterministically > replayable." If it turns out the input already is deterministically > replayable, then nothing needs to be done. > > > > On Wed, Aug 9, 2017 at 9:10 AM, Kenneth Knowles <k...@google.com.invalid> > wrote: > > > The term "determinism" refers to a property of the input PCollection, not > > any transform or DoFn. What we mean by it is that the PCollection has > > well-defined contents, so any transform consuming it will see consistent > > PCollection contents across retries. > > > > Illustrated, I think we are talking about the same situation, where we > hope > > for an execution like this: > > > > Transform(s) -> Checkpoint -> WriteTransform > > > > In every case I know of the purpose of the Checkpoint is so that > > WriteTransform sees the same input across retries, even if the upstream > > Transform(s) are not deterministic. > > > > So "marking DoFns as having side effects, and having the runner > > automatically insert such a Checkpoint in front of them" is precisely > what > > what you get from "Requires deterministic input". Of course, there are > lots > > of kinds of side effects and they don't all require deterministic input, > so > > that's how the vocabulary developed. > > > > Kenn > > > > On Wed, Aug 9, 2017 at 8:48 AM, Reuven Lax <re...@google.com.invalid> > > wrote: > > > > > Is determinism the right thing for this? One thing to keep in mind, is > > that > > > most inputs will not be deterministic. If any upstream aggregation is > > done > > > and allowed_lateness > 0, then that aggregation is non deterministic > > > (basically, if it is retried it might get a slightly different set of > > input > > > elements to aggregate) and so are downstream dependent values. > Similarly > > if > > > an upstream aggregation uses count or processing-time triggers, the > > result > > > of that aggregation will be non deterministic. > > > > > > In the above cases, the property of the DoFn that requires this > > > checkpointing is not determinism, it's the fact that the DoFn has side > > > effects. > > > > > > BTW, nothing prevents us from allowing automatic inference, but _also_ > > > adding a checkpoint operator (which will be a noop operator for runners > > > such as Dataflow). > > > > > > Reuven > > > > > > On Wed, Aug 9, 2017 at 8:32 AM, Kenneth Knowles <k...@google.com.invalid > > > > > wrote: > > > > > > > We've had a few threads related to this. There was one proposal that > > > seemed > > > > to achieve consensus [1]. The TL;DR is that we have to assume any > DoFn > > > > might have side effects (in the broadest sense of the term where > > anything > > > > other than a pure mathematical function is a side effect) and when we > > > want > > > > deterministic input we use a special DoFn parameter like distinct > from > > > > ProcessContext to request it, something like: > > > > > > > > @ProcessElement > > > > public void process(DeterministicInput elem, OutputReceiver > > mainOutput) { > > > > ... elem.get() instead of context.element() ... > > > > ... mainOutput.output() instead of context.output() ... > > > > } > > > > > > > > A runner can then add checkpointing if needed or elide it if not > > needed. > > > It > > > > depends on the runner's inherent checkpointing behavior and the > ability > > > to > > > > analyze a pipeline to know whether intervening transforms are > > > deterministic > > > > functions. > > > > > > > > I started some work on breaking down > > > > (StartBundle|Process|FinishBundle)Context to transition towards > this, > > > but > > > > development has stalled in favor of other priorities. I'd be happy to > > > chat > > > > with anyone who wants to pick this up. > > > > > > > > Kenn > > > > > > > > [1] > > > > https://lists.apache.org/thread.html/ae3c838df060e47148439d1dad818d > > > > 5e927b2a25ff00cc4153221dff@%3Cdev.beam.apache.org%3E > > > > > > > > On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek < > aljos...@apache.org> > > > > wrote: > > > > > > > > > Yes, I think making this explicit would be good. Having a > > > transformation > > > > > that makes assumptions about how the runner implements certain > things > > > is > > > > > not optimal. Also, I think that most people probably don't use > Kafka > > > with > > > > > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing > > > > here). > > > > > This would mean that the intersection of "people who would benefit > > from > > > > an > > > > > exactly-once Kafka sink" and "people who use Beam on Dataflow" is > > > rather > > > > > small, and therefore not many people would benefit from such a > > > Transform. > > > > > > > > > > This is all just conjecture, of course. > > > > > > > > > > Best, > > > > > Aljoscha > > > > > > > > > > > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID> > > > wrote: > > > > > > > > > > > > I think the issue we're hitting is how to write this in Beam. > > > > > > > > > > > > Dataflow historically guaranteed checkpointing at every GBK > (which > > > due > > > > to > > > > > > the design of Dataflow's streaming shuffle was reasonably > > efficient). > > > > In > > > > > > Beam we never formalized these semantics, leaving these syncs in > a > > > gray > > > > > > area. I believe the Spark runner currently checkpoints the RDD on > > > every > > > > > > GBK, so these unwritten semantics currently work for Dataflow and > > for > > > > > Spark. > > > > > > > > > > > > We need someway to express this operation in Beam, whether it be > > via > > > an > > > > > > explicit Checkpoint() operation or via marking DoFns as having > side > > > > > > effects, and having the runner automatically insert such a > > Checkpoint > > > > in > > > > > > front of them. In Flink, this operation can be implemented using > > what > > > > > > Aljoscha posted. > > > > > > > > > > > > Reuven > > > > > > > > > > > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek < > > > aljos...@apache.org> > > > > > > wrote: > > > > > > > > > > > >> Hi, > > > > > >> > > > > > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be > used > > > for > > > > > such > > > > > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds > > on > > > > > that: > > > > > >> [2] > > > > > >> > > > > > >> Best, > > > > > >> Aljoscha > > > > > >> > > > > > >> [1] https://github.com/apache/flink/blob/ > > > > 62e99918a45b7215c099fbcf160d45 > > > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > > > > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. > > > > java#L55 > > > > > < > > > > > >> https://github.com/apache/flink/blob/ > > 62e99918a45b7215c099fbcf160d45 > > > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > > > > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. > > > > java#L55> > > > > > >> [2] https://github.com/apache/flink/pull/4239 > > > > > >>> On 3. Aug 2017, at 04:03, Raghu Angadi > > <rang...@google.com.INVALID > > > > > > > > > >> wrote: > > > > > >>> > > > > > >>> Kafka 0.11 added support for transactions[1], which allows > > > end-to-end > > > > > >>> exactly-once semantics. Beam's KafkaIO users can benefit from > > these > > > > > while > > > > > >>> using runners that support exactly-once processing. > > > > > >>> > > > > > >>> I have an implementation of EOS support for Kafka sink : > > > > > >>> https://github.com/apache/beam/pull/3612 > > > > > >>> It has two shuffles and builds on Beam state-API and checkpoint > > > > barrier > > > > > >>> between stages (as in Dataflow). Pull request has a longer > > > > description. > > > > > >>> > > > > > >>> - What other runners in addition to Dataflow would be > compatible > > > with > > > > > >> such > > > > > >>> a strategy? > > > > > >>> - I think it does not quite work for Flink (as it has a global > > > > > >> checkpoint, > > > > > >>> not between the stages). How would one go about implementing > > such a > > > > > sink. > > > > > >>> > > > > > >>> Any comments on the pull request are also welcome. > > > > > >>> > > > > > >>> Thanks, > > > > > >>> Raghu. > > > > > >>> > > > > > >>> [1] > > > > > >>> https://www.confluent.io/blog/exactly-once-semantics-are- > > > > > >> possible-heres-how-apache-kafka-does-it/ > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >