On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <[email protected]> wrote:
>
> > the unification of bounded/unbounded within SplittableDoFn has always been 
> > a goal.
>
> I am glad to know that my intuition is correct and that this was envisioned, 
> the
> idea of checkpoints for bounded inputs sounds super really useful. Eager to 
> try
> that on practice.
>
> An explicit example (with a WatermarkEstimator for a bounded case would be
> really nice to see, for learning purposes), also with the unification goal 
> what
> if we align then the Bounded SDFs to have similar signatures no? I mean the
> method that returns a continuation even for the Bounded case.
>
> > Currently the watermark that is reported as part of the PollResult is passed
> > to the ProcessContext.updateWatermark [1, 2] function and instead that call
> > would be redirected to the ManualWatermarkEstimator.setWatermark function 
> > [3].
>
> Is there a JIRA for the Watch adjustments so we don't forget to integrate the
> WatermarkEstimators in? I am really curious on the implementation to see if I
> finally understand the internals of Watch too.
>
> Extra question: Do you think we can have a naive version of Unbounded SDF like
> we have the naive one on classical runners (if I understood correctly the
> current one is only for portable runners). I worry about the adoption 
> potential.

I think we should move to a world where *all* runners become portable
runners. The at doesn't mean they all need to user docker images, or
even GRPC, but I don't think having classical-only or
classical-excluded features is where we want to be long-term.

> On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <[email protected]> wrote:
> >
> > I don't have a strong preference for using a provider/having a set of
> > tightly coupled methods in Java, other than that we be consistent (and
> > we already use the methods style for restrictions).
> >
> > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <[email protected]> wrote:
> > >
> > > Jan, there are some parts of Apache Beam the watermarks package will 
> > > likely rely on (@Experimental annotation, javadoc links) but 
> > > fundamentally should not rely on core and someone could create a separate 
> > > package for this.
> >
> > I think it does make sense for a set of common watermark trackers to
> > be shipped with core (e.g. manual, monotonic, and eventually a
> > probabilistic one).
> >
> > > Ismael, the unification of bounded/unbounded within SplittableDoFn has 
> > > always been a goal. There are a set of features that BoundedSources are 
> > > unlikely to use but would still be allowed to use them. For example, 
> > > bounded sources may want to have support for checkpointing since I could 
> > > foresee an BoundedSource that can notice that a certain resource becomes 
> > > unavailable and can only process it later. The choice of which watermark 
> > > estimator to use is a likely point of difference between bounded and 
> > > unbounded SDFs since bounded SDFs would be able to use a very simple 
> > > estimator where the watermark is held at -infinity and only advances to 
> > > +infinity once there is no more data to process. But even though 
> > > unbounded SDFs are likely to be the most common users of varied watermark 
> > > estimators, a bounded SDF may still want to advance the watermark as they 
> > > read records so that runners that are more "streaming" (for example micro 
> > > batch) could process the entire pipeline in parallel vs other runners 
> > > that execute one whole segment of the pipeline at a time.
> >
> > Put another way, the value of watermark trackers is to allow
> > processing to continue downstream before the source has completed
> > reading. This is of course essential for streaming, but If the source
> > is read to completion before downstream stages start (as is the case
> > for most batch runners) it is not needed. What this unification does
> > allow, however, is a source to be written in such a way that can be
> > efficiently used in both batch and streaming mode.
> >
> > > Currently the watermark that is reported as part of the PollResult is 
> > > passed to the ProcessContext.updateWatermark [1, 2] function and instead 
> > > that call would be redirected to the 
> > > ManualWatermarkEstimator.setWatermark function [3].
> > >
> > > 1: 
> > > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > > 2: 
> > > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > > 3: 
> > > https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> > >
> > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <[email protected]> wrote:
> > >>
> > >> I just realized that the HBaseIO example is not a good one because we can
> > >> already have Watch like behavior as we do for Partition discovery in 
> > >> HCatalogIO.
> > >> Still I am interested on your views on bounded/unbounded unification.
> > >>
> > >> Interesting question2: How this will annotations connect with the Watch
> > >> transform Polling patterns?
> > >> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> > >>
> > >>
> > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <[email protected]> wrote:
> > >>>
> > >>> Really interesting! Implementing correctly the watermark has been a 
> > >>> common
> > >>> struggle for IO authors, to the point that some IOs still have issues 
> > >>> around
> > >>> that. So +1 for this, in particular if we can get to reuse common 
> > >>> patterns.
> > >>> I was not aware of Boyuan's work around this, really nice.
> > >>>
> > >>> One aspect I have always being confused about since I read the SDF 
> > >>> proposal
> > >>> documents is if we could get to have a single API for both Bounded and 
> > >>> Unbounded
> > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF 
> > >>> special case.
> > >>> Could WatermarkEstimator help in this direction?
> > >>>
> > >>> One quick case that I can think is to make the current HBaseIO SDF to 
> > >>> work in an
> > >>> unbounded manner, for example to 'watch and read new tables'.
> > >>>
> > >>>
> > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <[email protected]> wrote:
> > >>>>
> > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
> > >>>>
> > >>>> To support watermark reporting within the Java SDK for 
> > >>>> SplittableDoFns, we need a way to have SDF authors to report watermark 
> > >>>> estimates over the element and restriction pair that they are 
> > >>>> processing.
> > >>>>
> > >>>> For UnboundedSources, it was found to be a pain point to ask each SDF 
> > >>>> author to write their own watermark estimation which typically 
> > >>>> prevented re-use. Therefore we would like to have a "library" of 
> > >>>> watermark estimators that help SDF authors perform this estimation 
> > >>>> similar to how there is a "library" of restrictions and restriction 
> > >>>> trackers that SDF authors can use. For SDF authors where the existing 
> > >>>> library doesn't work, they can add additional ones that observe 
> > >>>> timestamps of elements or choose to directly report the watermark 
> > >>>> through a "ManualWatermarkEstimator" parameter that can be supplied to 
> > >>>> @ProcessElement methods.
> > >>>>
> > >>>> The public facing portion of the DoFn changes adds three new 
> > >>>> annotations for new DoFn style methods:
> > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark 
> > >>>> state, similar to GetInitialRestriction
> > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with 
> > >>>> watermark state type, similar to GetRestrictionCoder for restrictions 
> > >>>> returned by GetInitialRestriction.
> > >>>> NewWatermarkEstimator: Returns a watermark estimator that either the 
> > >>>> framework invokes allowing it to observe the timestamps of output 
> > >>>> records or a manual watermark estimator that can be explicitly invoked 
> > >>>> to update the watermark.
> > >>>>
> > >>>> See [3] for an initial PR with the public facing additions to the core 
> > >>>> Java API related to SplittableDoFn.
> > >>>>
> > >>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon 
> > >>>> SDK [4, 5] but in the style of new DoFn parameter/method invocation we 
> > >>>> have in the Java SDK.
> > >>>>
> > >>>> 1: https://s.apache.org/splittable-do-fn
> > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > >>>> 3: https://github.com/apache/beam/pull/10992
> > >>>> 4: https://github.com/apache/beam/pull/9794
> > >>>> 5: https://github.com/apache/beam/pull/10375

Reply via email to