On Wed, Mar 4, 2020 at 7:36 AM Ismaël Mejía <ieme...@gmail.com> wrote:
> > I think we should move to a world where *all* runners become portable > runners. > > The at doesn't mean they all need to user docker images, or even GRPC, > but I > > don't think having classical-only or classical-excluded features is > where we > > want to be long-term. > > Robert I agree 100% with you, I dream of the day where classic runners do > not > exist anymore and we do not have this issues like this one (of not > available > features), however there are still two requirements to abandon them: (1) > that > the performance overhead is not considerable bigger for existing users (in > particular Java users) and (2) that the portability abstractions are > mature. We > are getting there, but not yet there. > Dataflow and its internal counter point (Flume) have had good experience running Go and Python portable pipelines at the same or better performance then the closest non-portable equivalent has been. Java has been less of a focus since it is the most mature non-portable implementation but hopefully will move in that direction quickly soon. This would be a great time for any contributors who are interested in specific runners to help migrate them to portable implementations. > On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw <rober...@google.com> > wrote: > > > > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ieme...@gmail.com> wrote: > > > > > > > the unification of bounded/unbounded within SplittableDoFn has > always been a goal. > > > > > > I am glad to know that my intuition is correct and that this was > envisioned, the > > > idea of checkpoints for bounded inputs sounds super really useful. > Eager to try > > > that on practice. > > > > > > An explicit example (with a WatermarkEstimator for a bounded case > would be > > > really nice to see, for learning purposes), also with the unification > goal what > > > if we align then the Bounded SDFs to have similar signatures no? I > mean the > > > method that returns a continuation even for the Bounded case. > > > > > > > Currently the watermark that is reported as part of the PollResult > is passed > > > > to the ProcessContext.updateWatermark [1, 2] function and instead > that call > > > > would be redirected to the ManualWatermarkEstimator.setWatermark > function [3]. > > > > > > Is there a JIRA for the Watch adjustments so we don't forget to > integrate the > > > WatermarkEstimators in? I am really curious on the implementation to > see if I > > > finally understand the internals of Watch too. > > > > > > Extra question: Do you think we can have a naive version of Unbounded > SDF like > > > we have the naive one on classical runners (if I understood correctly > the > > > current one is only for portable runners). I worry about the adoption > potential. > > > > I think we should move to a world where *all* runners become portable > > runners. The at doesn't mean they all need to user docker images, or > > even GRPC, but I don't think having classical-only or > > classical-excluded features is where we want to be long-term. > > > > > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <rober...@google.com> > wrote: > > > > > > > > I don't have a strong preference for using a provider/having a set of > > > > tightly coupled methods in Java, other than that we be consistent > (and > > > > we already use the methods style for restrictions). > > > > > > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote: > > > > > > > > > > Jan, there are some parts of Apache Beam the watermarks package > will likely rely on (@Experimental annotation, javadoc links) but > fundamentally should not rely on core and someone could create a separate > package for this. > > > > > > > > I think it does make sense for a set of common watermark trackers to > > > > be shipped with core (e.g. manual, monotonic, and eventually a > > > > probabilistic one). > > > > > > > > > Ismael, the unification of bounded/unbounded within SplittableDoFn > has always been a goal. There are a set of features that BoundedSources are > unlikely to use but would still be allowed to use them. For example, > bounded sources may want to have support for checkpointing since I could > foresee an BoundedSource that can notice that a certain resource becomes > unavailable and can only process it later. The choice of which watermark > estimator to use is a likely point of difference between bounded and > unbounded SDFs since bounded SDFs would be able to use a very simple > estimator where the watermark is held at -infinity and only advances to > +infinity once there is no more data to process. But even though unbounded > SDFs are likely to be the most common users of varied watermark estimators, > a bounded SDF may still want to advance the watermark as they read records > so that runners that are more "streaming" (for example micro batch) could > process the entire pipeline in parallel vs other runners that execute one > whole segment of the pipeline at a time. > > > > > > > > Put another way, the value of watermark trackers is to allow > > > > processing to continue downstream before the source has completed > > > > reading. This is of course essential for streaming, but If the source > > > > is read to completion before downstream stages start (as is the case > > > > for most batch runners) it is not needed. What this unification does > > > > allow, however, is a source to be written in such a way that can be > > > > efficiently used in both batch and streaming mode. > > > > > > > > > Currently the watermark that is reported as part of the PollResult > is passed to the ProcessContext.updateWatermark [1, 2] function and instead > that call would be redirected to the ManualWatermarkEstimator.setWatermark > function [3]. > > > > > > > > > > 1: > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757 > > > > > 2: > https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275 > > > > > 3: > https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45 > > > > > > > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ieme...@gmail.com> > wrote: > > > > >> > > > > >> I just realized that the HBaseIO example is not a good one > because we can > > > > >> already have Watch like behavior as we do for Partition discovery > in HCatalogIO. > > > > >> Still I am interested on your views on bounded/unbounded > unification. > > > > >> > > > > >> Interesting question2: How this will annotations connect with the > Watch > > > > >> transform Polling patterns? > > > > >> > https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178 > > > > >> > > > > >> > > > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ieme...@gmail.com> > wrote: > > > > >>> > > > > >>> Really interesting! Implementing correctly the watermark has > been a common > > > > >>> struggle for IO authors, to the point that some IOs still have > issues around > > > > >>> that. So +1 for this, in particular if we can get to reuse > common patterns. > > > > >>> I was not aware of Boyuan's work around this, really nice. > > > > >>> > > > > >>> One aspect I have always being confused about since I read the > SDF proposal > > > > >>> documents is if we could get to have a single API for both > Bounded and Unbounded > > > > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF > special case. > > > > >>> Could WatermarkEstimator help in this direction? > > > > >>> > > > > >>> One quick case that I can think is to make the current HBaseIO > SDF to work in an > > > > >>> unbounded manner, for example to 'watch and read new tables'. > > > > >>> > > > > >>> > > > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> > wrote: > > > > >>>> > > > > >>>> See this doc[1] and blog[2] for some context about > SplittableDoFns. > > > > >>>> > > > > >>>> To support watermark reporting within the Java SDK for > SplittableDoFns, we need a way to have SDF authors to report watermark > estimates over the element and restriction pair that they are processing. > > > > >>>> > > > > >>>> For UnboundedSources, it was found to be a pain point to ask > each SDF author to write their own watermark estimation which typically > prevented re-use. Therefore we would like to have a "library" of watermark > estimators that help SDF authors perform this estimation similar to how > there is a "library" of restrictions and restriction trackers that SDF > authors can use. For SDF authors where the existing library doesn't work, > they can add additional ones that observe timestamps of elements or choose > to directly report the watermark through a "ManualWatermarkEstimator" > parameter that can be supplied to @ProcessElement methods. > > > > >>>> > > > > >>>> The public facing portion of the DoFn changes adds three new > annotations for new DoFn style methods: > > > > >>>> GetInitialWatermarkEstimatorState: Returns the initial > watermark state, similar to GetInitialRestriction > > > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible > with watermark state type, similar to GetRestrictionCoder for restrictions > returned by GetInitialRestriction. > > > > >>>> NewWatermarkEstimator: Returns a watermark estimator that > either the framework invokes allowing it to observe the timestamps of > output records or a manual watermark estimator that can be explicitly > invoked to update the watermark. > > > > >>>> > > > > >>>> See [3] for an initial PR with the public facing additions to > the core Java API related to SplittableDoFn. > > > > >>>> > > > > >>>> This mirrors a bunch of work that was done by Boyuan within the > Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation > we have in the Java SDK. > > > > >>>> > > > > >>>> 1: https://s.apache.org/splittable-do-fn > > > > >>>> 2: > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > > > > >>>> 3: https://github.com/apache/beam/pull/10992 > > > > >>>> 4: https://github.com/apache/beam/pull/9794 > > > > >>>> 5: https://github.com/apache/beam/pull/10375 >