On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ieme...@gmail.com> wrote:
> > the unification of bounded/unbounded within SplittableDoFn has always > been a goal. > > I am glad to know that my intuition is correct and that this was > envisioned, the > idea of checkpoints for bounded inputs sounds super really useful. Eager > to try > that on practice. > > An explicit example (with a WatermarkEstimator for a bounded case would be > really nice to see, for learning purposes), also with the unification goal > what > if we align then the Bounded SDFs to have similar signatures no? I mean the > method that returns a continuation even for the Bounded case. > Bounded SDFs are allowed to have a method signature which has void as the return type OR a ProcessContinuation. Unbounded SDFs must use a ProcessContinuation as the return type. The "void" return case improves ease of use since it is likely to be the common case for bounded SDFs. > > Currently the watermark that is reported as part of the PollResult is > passed > > to the ProcessContext.updateWatermark [1, 2] function and instead that > call > > would be redirected to the ManualWatermarkEstimator.setWatermark > function [3]. > > Is there a JIRA for the Watch adjustments so we don't forget to integrate > the > WatermarkEstimators in? I am really curious on the implementation to see > if I > finally understand the internals of Watch too. > Migrating from ProcessContext#updateWatermark to WatermarkEstimators will require updating Watch. Filed BEAM-9430. > Extra question: Do you think we can have a naive version of Unbounded SDF > like > we have the naive one on classical runners (if I understood correctly the > current one is only for portable runners). I worry about the adoption > potential. > There is SplittableParDoViaKeyedWorkItems[1] that can be used by classical runners but it has limitations. 1: https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <rober...@google.com> > wrote: > > > > I don't have a strong preference for using a provider/having a set of > > tightly coupled methods in Java, other than that we be consistent (and > > we already use the methods style for restrictions). > > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote: > > > > > > Jan, there are some parts of Apache Beam the watermarks package will > likely rely on (@Experimental annotation, javadoc links) but fundamentally > should not rely on core and someone could create a separate package for > this. > > > > I think it does make sense for a set of common watermark trackers to > > be shipped with core (e.g. manual, monotonic, and eventually a > > probabilistic one). > > > > > Ismael, the unification of bounded/unbounded within SplittableDoFn has > always been a goal. There are a set of features that BoundedSources are > unlikely to use but would still be allowed to use them. For example, > bounded sources may want to have support for checkpointing since I could > foresee an BoundedSource that can notice that a certain resource becomes > unavailable and can only process it later. The choice of which watermark > estimator to use is a likely point of difference between bounded and > unbounded SDFs since bounded SDFs would be able to use a very simple > estimator where the watermark is held at -infinity and only advances to > +infinity once there is no more data to process. But even though unbounded > SDFs are likely to be the most common users of varied watermark estimators, > a bounded SDF may still want to advance the watermark as they read records > so that runners that are more "streaming" (for example micro batch) could > process the entire pipeline in parallel vs other runners that execute one > whole segment of the pipeline at a time. > > > > Put another way, the value of watermark trackers is to allow > > processing to continue downstream before the source has completed > > reading. This is of course essential for streaming, but If the source > > is read to completion before downstream stages start (as is the case > > for most batch runners) it is not needed. What this unification does > > allow, however, is a source to be written in such a way that can be > > efficiently used in both batch and streaming mode. > > > > > Currently the watermark that is reported as part of the PollResult is > passed to the ProcessContext.updateWatermark [1, 2] function and instead > that call would be redirected to the ManualWatermarkEstimator.setWatermark > function [3]. > > > > > > 1: > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757 > > > 2: > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275 > > > 3: > https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45 > > > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ieme...@gmail.com> > wrote: > > >> > > >> I just realized that the HBaseIO example is not a good one because we > can > > >> already have Watch like behavior as we do for Partition discovery in > HCatalogIO. > > >> Still I am interested on your views on bounded/unbounded unification. > > >> > > >> Interesting question2: How this will annotations connect with the > Watch > > >> transform Polling patterns? > > >> > https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178 > > >> > > >> > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ieme...@gmail.com> > wrote: > > >>> > > >>> Really interesting! Implementing correctly the watermark has been a > common > > >>> struggle for IO authors, to the point that some IOs still have > issues around > > >>> that. So +1 for this, in particular if we can get to reuse common > patterns. > > >>> I was not aware of Boyuan's work around this, really nice. > > >>> > > >>> One aspect I have always being confused about since I read the SDF > proposal > > >>> documents is if we could get to have a single API for both Bounded > and Unbounded > > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF > special case. > > >>> Could WatermarkEstimator help in this direction? > > >>> > > >>> One quick case that I can think is to make the current HBaseIO SDF > to work in an > > >>> unbounded manner, for example to 'watch and read new tables'. > > >>> > > >>> > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote: > > >>>> > > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns. > > >>>> > > >>>> To support watermark reporting within the Java SDK for > SplittableDoFns, we need a way to have SDF authors to report watermark > estimates over the element and restriction pair that they are processing. > > >>>> > > >>>> For UnboundedSources, it was found to be a pain point to ask each > SDF author to write their own watermark estimation which typically > prevented re-use. Therefore we would like to have a "library" of watermark > estimators that help SDF authors perform this estimation similar to how > there is a "library" of restrictions and restriction trackers that SDF > authors can use. For SDF authors where the existing library doesn't work, > they can add additional ones that observe timestamps of elements or choose > to directly report the watermark through a "ManualWatermarkEstimator" > parameter that can be supplied to @ProcessElement methods. > > >>>> > > >>>> The public facing portion of the DoFn changes adds three new > annotations for new DoFn style methods: > > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark > state, similar to GetInitialRestriction > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with > watermark state type, similar to GetRestrictionCoder for restrictions > returned by GetInitialRestriction. > > >>>> NewWatermarkEstimator: Returns a watermark estimator that either > the framework invokes allowing it to observe the timestamps of output > records or a manual watermark estimator that can be explicitly invoked to > update the watermark. > > >>>> > > >>>> See [3] for an initial PR with the public facing additions to the > core Java API related to SplittableDoFn. > > >>>> > > >>>> This mirrors a bunch of work that was done by Boyuan within the > Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation > we have in the Java SDK. > > >>>> > > >>>> 1: https://s.apache.org/splittable-do-fn > > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > > >>>> 3: https://github.com/apache/beam/pull/10992 > > >>>> 4: https://github.com/apache/beam/pull/9794 > > >>>> 5: https://github.com/apache/beam/pull/10375 >