On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <[email protected]> wrote: > > > the unification of bounded/unbounded within SplittableDoFn has always been > > a goal. > > I am glad to know that my intuition is correct and that this was envisioned, > the > idea of checkpoints for bounded inputs sounds super really useful. Eager to > try > that on practice. > > An explicit example (with a WatermarkEstimator for a bounded case would be > really nice to see, for learning purposes), also with the unification goal > what > if we align then the Bounded SDFs to have similar signatures no? I mean the > method that returns a continuation even for the Bounded case. > > > Currently the watermark that is reported as part of the PollResult is passed > > to the ProcessContext.updateWatermark [1, 2] function and instead that call > > would be redirected to the ManualWatermarkEstimator.setWatermark function > > [3]. > > Is there a JIRA for the Watch adjustments so we don't forget to integrate the > WatermarkEstimators in? I am really curious on the implementation to see if I > finally understand the internals of Watch too. > > Extra question: Do you think we can have a naive version of Unbounded SDF like > we have the naive one on classical runners (if I understood correctly the > current one is only for portable runners). I worry about the adoption > potential.
I think we should move to a world where *all* runners become portable runners. The at doesn't mean they all need to user docker images, or even GRPC, but I don't think having classical-only or classical-excluded features is where we want to be long-term. > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <[email protected]> wrote: > > > > I don't have a strong preference for using a provider/having a set of > > tightly coupled methods in Java, other than that we be consistent (and > > we already use the methods style for restrictions). > > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <[email protected]> wrote: > > > > > > Jan, there are some parts of Apache Beam the watermarks package will > > > likely rely on (@Experimental annotation, javadoc links) but > > > fundamentally should not rely on core and someone could create a separate > > > package for this. > > > > I think it does make sense for a set of common watermark trackers to > > be shipped with core (e.g. manual, monotonic, and eventually a > > probabilistic one). > > > > > Ismael, the unification of bounded/unbounded within SplittableDoFn has > > > always been a goal. There are a set of features that BoundedSources are > > > unlikely to use but would still be allowed to use them. For example, > > > bounded sources may want to have support for checkpointing since I could > > > foresee an BoundedSource that can notice that a certain resource becomes > > > unavailable and can only process it later. The choice of which watermark > > > estimator to use is a likely point of difference between bounded and > > > unbounded SDFs since bounded SDFs would be able to use a very simple > > > estimator where the watermark is held at -infinity and only advances to > > > +infinity once there is no more data to process. But even though > > > unbounded SDFs are likely to be the most common users of varied watermark > > > estimators, a bounded SDF may still want to advance the watermark as they > > > read records so that runners that are more "streaming" (for example micro > > > batch) could process the entire pipeline in parallel vs other runners > > > that execute one whole segment of the pipeline at a time. > > > > Put another way, the value of watermark trackers is to allow > > processing to continue downstream before the source has completed > > reading. This is of course essential for streaming, but If the source > > is read to completion before downstream stages start (as is the case > > for most batch runners) it is not needed. What this unification does > > allow, however, is a source to be written in such a way that can be > > efficiently used in both batch and streaming mode. > > > > > Currently the watermark that is reported as part of the PollResult is > > > passed to the ProcessContext.updateWatermark [1, 2] function and instead > > > that call would be redirected to the > > > ManualWatermarkEstimator.setWatermark function [3]. > > > > > > 1: > > > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757 > > > 2: > > > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275 > > > 3: > > > https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45 > > > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <[email protected]> wrote: > > >> > > >> I just realized that the HBaseIO example is not a good one because we can > > >> already have Watch like behavior as we do for Partition discovery in > > >> HCatalogIO. > > >> Still I am interested on your views on bounded/unbounded unification. > > >> > > >> Interesting question2: How this will annotations connect with the Watch > > >> transform Polling patterns? > > >> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178 > > >> > > >> > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <[email protected]> wrote: > > >>> > > >>> Really interesting! Implementing correctly the watermark has been a > > >>> common > > >>> struggle for IO authors, to the point that some IOs still have issues > > >>> around > > >>> that. So +1 for this, in particular if we can get to reuse common > > >>> patterns. > > >>> I was not aware of Boyuan's work around this, really nice. > > >>> > > >>> One aspect I have always being confused about since I read the SDF > > >>> proposal > > >>> documents is if we could get to have a single API for both Bounded and > > >>> Unbounded > > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF > > >>> special case. > > >>> Could WatermarkEstimator help in this direction? > > >>> > > >>> One quick case that I can think is to make the current HBaseIO SDF to > > >>> work in an > > >>> unbounded manner, for example to 'watch and read new tables'. > > >>> > > >>> > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <[email protected]> wrote: > > >>>> > > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns. > > >>>> > > >>>> To support watermark reporting within the Java SDK for > > >>>> SplittableDoFns, we need a way to have SDF authors to report watermark > > >>>> estimates over the element and restriction pair that they are > > >>>> processing. > > >>>> > > >>>> For UnboundedSources, it was found to be a pain point to ask each SDF > > >>>> author to write their own watermark estimation which typically > > >>>> prevented re-use. Therefore we would like to have a "library" of > > >>>> watermark estimators that help SDF authors perform this estimation > > >>>> similar to how there is a "library" of restrictions and restriction > > >>>> trackers that SDF authors can use. For SDF authors where the existing > > >>>> library doesn't work, they can add additional ones that observe > > >>>> timestamps of elements or choose to directly report the watermark > > >>>> through a "ManualWatermarkEstimator" parameter that can be supplied to > > >>>> @ProcessElement methods. > > >>>> > > >>>> The public facing portion of the DoFn changes adds three new > > >>>> annotations for new DoFn style methods: > > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark > > >>>> state, similar to GetInitialRestriction > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with > > >>>> watermark state type, similar to GetRestrictionCoder for restrictions > > >>>> returned by GetInitialRestriction. > > >>>> NewWatermarkEstimator: Returns a watermark estimator that either the > > >>>> framework invokes allowing it to observe the timestamps of output > > >>>> records or a manual watermark estimator that can be explicitly invoked > > >>>> to update the watermark. > > >>>> > > >>>> See [3] for an initial PR with the public facing additions to the core > > >>>> Java API related to SplittableDoFn. > > >>>> > > >>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon > > >>>> SDK [4, 5] but in the style of new DoFn parameter/method invocation we > > >>>> have in the Java SDK. > > >>>> > > >>>> 1: https://s.apache.org/splittable-do-fn > > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > > >>>> 3: https://github.com/apache/beam/pull/10992 > > >>>> 4: https://github.com/apache/beam/pull/9794 > > >>>> 5: https://github.com/apache/beam/pull/10375
