On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ieme...@gmail.com> wrote:

> > the unification of bounded/unbounded within SplittableDoFn has always
> been a goal.
>
> I am glad to know that my intuition is correct and that this was
> envisioned, the
> idea of checkpoints for bounded inputs sounds super really useful. Eager
> to try
> that on practice.
>
> An explicit example (with a WatermarkEstimator for a bounded case would be
> really nice to see, for learning purposes), also with the unification goal
> what
> if we align then the Bounded SDFs to have similar signatures no? I mean the
> method that returns a continuation even for the Bounded case.
>

Bounded SDFs are allowed to have a method signature which has void as the
return type OR a ProcessContinuation. Unbounded SDFs must use a
ProcessContinuation as the return type.  The "void" return case improves
ease of use since it is likely to be the common case for bounded SDFs.


> > Currently the watermark that is reported as part of the PollResult is
> passed
> > to the ProcessContext.updateWatermark [1, 2] function and instead that
> call
> > would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
>
> Is there a JIRA for the Watch adjustments so we don't forget to integrate
> the
> WatermarkEstimators in? I am really curious on the implementation to see
> if I
> finally understand the internals of Watch too.
>

Migrating from ProcessContext#updateWatermark to WatermarkEstimators will
require updating Watch. Filed BEAM-9430.


> Extra question: Do you think we can have a naive version of Unbounded SDF
> like
> we have the naive one on classical runners (if I understood correctly the
> current one is only for portable runners). I worry about the adoption
> potential.
>

There is SplittableParDoViaKeyedWorkItems[1] that can be used by classical
runners but it has limitations.

1:
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java


> On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >
> > I don't have a strong preference for using a provider/having a set of
> > tightly coupled methods in Java, other than that we be consistent (and
> > we already use the methods style for restrictions).
> >
> > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> > >
> > > Jan, there are some parts of Apache Beam the watermarks package will
> likely rely on (@Experimental annotation, javadoc links) but fundamentally
> should not rely on core and someone could create a separate package for
> this.
> >
> > I think it does make sense for a set of common watermark trackers to
> > be shipped with core (e.g. manual, monotonic, and eventually a
> > probabilistic one).
> >
> > > Ismael, the unification of bounded/unbounded within SplittableDoFn has
> always been a goal. There are a set of features that BoundedSources are
> unlikely to use but would still be allowed to use them. For example,
> bounded sources may want to have support for checkpointing since I could
> foresee an BoundedSource that can notice that a certain resource becomes
> unavailable and can only process it later. The choice of which watermark
> estimator to use is a likely point of difference between bounded and
> unbounded SDFs since bounded SDFs would be able to use a very simple
> estimator where the watermark is held at -infinity and only advances to
> +infinity once there is no more data to process. But even though unbounded
> SDFs are likely to be the most common users of varied watermark estimators,
> a bounded SDF may still want to advance the watermark as they read records
> so that runners that are more "streaming" (for example micro batch) could
> process the entire pipeline in parallel vs other runners that execute one
> whole segment of the pipeline at a time.
> >
> > Put another way, the value of watermark trackers is to allow
> > processing to continue downstream before the source has completed
> > reading. This is of course essential for streaming, but If the source
> > is read to completion before downstream stages start (as is the case
> > for most batch runners) it is not needed. What this unification does
> > allow, however, is a source to be written in such a way that can be
> > efficiently used in both batch and streaming mode.
> >
> > > Currently the watermark that is reported as part of the PollResult is
> passed to the ProcessContext.updateWatermark [1, 2] function and instead
> that call would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> > >
> > > 1:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > > 2:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > > 3:
> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> > >
> > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ieme...@gmail.com>
> wrote:
> > >>
> > >> I just realized that the HBaseIO example is not a good one because we
> can
> > >> already have Watch like behavior as we do for Partition discovery in
> HCatalogIO.
> > >> Still I am interested on your views on bounded/unbounded unification.
> > >>
> > >> Interesting question2: How this will annotations connect with the
> Watch
> > >> transform Polling patterns?
> > >>
> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> > >>
> > >>
> > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ieme...@gmail.com>
> wrote:
> > >>>
> > >>> Really interesting! Implementing correctly the watermark has been a
> common
> > >>> struggle for IO authors, to the point that some IOs still have
> issues around
> > >>> that. So +1 for this, in particular if we can get to reuse common
> patterns.
> > >>> I was not aware of Boyuan's work around this, really nice.
> > >>>
> > >>> One aspect I have always being confused about since I read the SDF
> proposal
> > >>> documents is if we could get to have a single API for both Bounded
> and Unbounded
> > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF
> special case.
> > >>> Could WatermarkEstimator help in this direction?
> > >>>
> > >>> One quick case that I can think is to make the current HBaseIO SDF
> to work in an
> > >>> unbounded manner, for example to 'watch and read new tables'.
> > >>>
> > >>>
> > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
> > >>>>
> > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
> > >>>>
> > >>>> To support watermark reporting within the Java SDK for
> SplittableDoFns, we need a way to have SDF authors to report watermark
> estimates over the element and restriction pair that they are processing.
> > >>>>
> > >>>> For UnboundedSources, it was found to be a pain point to ask each
> SDF author to write their own watermark estimation which typically
> prevented re-use. Therefore we would like to have a "library" of watermark
> estimators that help SDF authors perform this estimation similar to how
> there is a "library" of restrictions and restriction trackers that SDF
> authors can use. For SDF authors where the existing library doesn't work,
> they can add additional ones that observe timestamps of elements or choose
> to directly report the watermark through a "ManualWatermarkEstimator"
> parameter that can be supplied to @ProcessElement methods.
> > >>>>
> > >>>> The public facing portion of the DoFn changes adds three new
> annotations for new DoFn style methods:
> > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark
> state, similar to GetInitialRestriction
> > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with
> watermark state type, similar to GetRestrictionCoder for restrictions
> returned by GetInitialRestriction.
> > >>>> NewWatermarkEstimator: Returns a watermark estimator that either
> the framework invokes allowing it to observe the timestamps of output
> records or a manual watermark estimator that can be explicitly invoked to
> update the watermark.
> > >>>>
> > >>>> See [3] for an initial PR with the public facing additions to the
> core Java API related to SplittableDoFn.
> > >>>>
> > >>>> This mirrors a bunch of work that was done by Boyuan within the
> Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation
> we have in the Java SDK.
> > >>>>
> > >>>> 1: https://s.apache.org/splittable-do-fn
> > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > >>>> 3: https://github.com/apache/beam/pull/10992
> > >>>> 4: https://github.com/apache/beam/pull/9794
> > >>>> 5: https://github.com/apache/beam/pull/10375
>

Reply via email to