On Wed, Mar 4, 2020 at 7:36 AM Ismaël Mejía <ieme...@gmail.com> wrote:

> > I think we should move to a world where *all* runners become portable
> runners.
> > The at doesn't mean they all need to user docker images, or even GRPC,
> but I
> > don't think having classical-only or classical-excluded features is
> where we
> > want to be long-term.
>
> Robert I agree 100% with you, I dream of the day where classic runners do
> not
> exist anymore and we do not have this issues like this one (of not
> available
> features), however there are still two requirements to abandon them: (1)
> that
> the performance overhead is not considerable bigger for existing users (in
> particular Java users) and (2) that the portability abstractions are
> mature. We
> are getting there, but not yet there.
>

Dataflow and its internal counter point (Flume) have had good experience
running Go and Python portable pipelines at the same or better performance
then the closest non-portable equivalent has been. Java has been less of a
focus since it is the most mature non-portable implementation but hopefully
will move in that direction quickly soon. This would be a great time for
any contributors who are interested in specific runners to help migrate
them to portable implementations.


> On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >
> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ieme...@gmail.com> wrote:
> > >
> > > > the unification of bounded/unbounded within SplittableDoFn has
> always been a goal.
> > >
> > > I am glad to know that my intuition is correct and that this was
> envisioned, the
> > > idea of checkpoints for bounded inputs sounds super really useful.
> Eager to try
> > > that on practice.
> > >
> > > An explicit example (with a WatermarkEstimator for a bounded case
> would be
> > > really nice to see, for learning purposes), also with the unification
> goal what
> > > if we align then the Bounded SDFs to have similar signatures no? I
> mean the
> > > method that returns a continuation even for the Bounded case.
> > >
> > > > Currently the watermark that is reported as part of the PollResult
> is passed
> > > > to the ProcessContext.updateWatermark [1, 2] function and instead
> that call
> > > > would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> > >
> > > Is there a JIRA for the Watch adjustments so we don't forget to
> integrate the
> > > WatermarkEstimators in? I am really curious on the implementation to
> see if I
> > > finally understand the internals of Watch too.
> > >
> > > Extra question: Do you think we can have a naive version of Unbounded
> SDF like
> > > we have the naive one on classical runners (if I understood correctly
> the
> > > current one is only for portable runners). I worry about the adoption
> potential.
> >
> > I think we should move to a world where *all* runners become portable
> > runners. The at doesn't mean they all need to user docker images, or
> > even GRPC, but I don't think having classical-only or
> > classical-excluded features is where we want to be long-term.
> >
> > > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <rober...@google.com>
> wrote:
> > > >
> > > > I don't have a strong preference for using a provider/having a set of
> > > > tightly coupled methods in Java, other than that we be consistent
> (and
> > > > we already use the methods style for restrictions).
> > > >
> > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> > > > >
> > > > > Jan, there are some parts of Apache Beam the watermarks package
> will likely rely on (@Experimental annotation, javadoc links) but
> fundamentally should not rely on core and someone could create a separate
> package for this.
> > > >
> > > > I think it does make sense for a set of common watermark trackers to
> > > > be shipped with core (e.g. manual, monotonic, and eventually a
> > > > probabilistic one).
> > > >
> > > > > Ismael, the unification of bounded/unbounded within SplittableDoFn
> has always been a goal. There are a set of features that BoundedSources are
> unlikely to use but would still be allowed to use them. For example,
> bounded sources may want to have support for checkpointing since I could
> foresee an BoundedSource that can notice that a certain resource becomes
> unavailable and can only process it later. The choice of which watermark
> estimator to use is a likely point of difference between bounded and
> unbounded SDFs since bounded SDFs would be able to use a very simple
> estimator where the watermark is held at -infinity and only advances to
> +infinity once there is no more data to process. But even though unbounded
> SDFs are likely to be the most common users of varied watermark estimators,
> a bounded SDF may still want to advance the watermark as they read records
> so that runners that are more "streaming" (for example micro batch) could
> process the entire pipeline in parallel vs other runners that execute one
> whole segment of the pipeline at a time.
> > > >
> > > > Put another way, the value of watermark trackers is to allow
> > > > processing to continue downstream before the source has completed
> > > > reading. This is of course essential for streaming, but If the source
> > > > is read to completion before downstream stages start (as is the case
> > > > for most batch runners) it is not needed. What this unification does
> > > > allow, however, is a source to be written in such a way that can be
> > > > efficiently used in both batch and streaming mode.
> > > >
> > > > > Currently the watermark that is reported as part of the PollResult
> is passed to the ProcessContext.updateWatermark [1, 2] function and instead
> that call would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> > > > >
> > > > > 1:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > > > > 2:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > > > > 3:
> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> > > > >
> > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ieme...@gmail.com>
> wrote:
> > > > >>
> > > > >> I just realized that the HBaseIO example is not a good one
> because we can
> > > > >> already have Watch like behavior as we do for Partition discovery
> in HCatalogIO.
> > > > >> Still I am interested on your views on bounded/unbounded
> unification.
> > > > >>
> > > > >> Interesting question2: How this will annotations connect with the
> Watch
> > > > >> transform Polling patterns?
> > > > >>
> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> > > > >>
> > > > >>
> > > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ieme...@gmail.com>
> wrote:
> > > > >>>
> > > > >>> Really interesting! Implementing correctly the watermark has
> been a common
> > > > >>> struggle for IO authors, to the point that some IOs still have
> issues around
> > > > >>> that. So +1 for this, in particular if we can get to reuse
> common patterns.
> > > > >>> I was not aware of Boyuan's work around this, really nice.
> > > > >>>
> > > > >>> One aspect I have always being confused about since I read the
> SDF proposal
> > > > >>> documents is if we could get to have a single API for both
> Bounded and Unbounded
> > > > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF
> special case.
> > > > >>> Could WatermarkEstimator help in this direction?
> > > > >>>
> > > > >>> One quick case that I can think is to make the current HBaseIO
> SDF to work in an
> > > > >>> unbounded manner, for example to 'watch and read new tables'.
> > > > >>>
> > > > >>>
> > > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com>
> wrote:
> > > > >>>>
> > > > >>>> See this doc[1] and blog[2] for some context about
> SplittableDoFns.
> > > > >>>>
> > > > >>>> To support watermark reporting within the Java SDK for
> SplittableDoFns, we need a way to have SDF authors to report watermark
> estimates over the element and restriction pair that they are processing.
> > > > >>>>
> > > > >>>> For UnboundedSources, it was found to be a pain point to ask
> each SDF author to write their own watermark estimation which typically
> prevented re-use. Therefore we would like to have a "library" of watermark
> estimators that help SDF authors perform this estimation similar to how
> there is a "library" of restrictions and restriction trackers that SDF
> authors can use. For SDF authors where the existing library doesn't work,
> they can add additional ones that observe timestamps of elements or choose
> to directly report the watermark through a "ManualWatermarkEstimator"
> parameter that can be supplied to @ProcessElement methods.
> > > > >>>>
> > > > >>>> The public facing portion of the DoFn changes adds three new
> annotations for new DoFn style methods:
> > > > >>>> GetInitialWatermarkEstimatorState: Returns the initial
> watermark state, similar to GetInitialRestriction
> > > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible
> with watermark state type, similar to GetRestrictionCoder for restrictions
> returned by GetInitialRestriction.
> > > > >>>> NewWatermarkEstimator: Returns a watermark estimator that
> either the framework invokes allowing it to observe the timestamps of
> output records or a manual watermark estimator that can be explicitly
> invoked to update the watermark.
> > > > >>>>
> > > > >>>> See [3] for an initial PR with the public facing additions to
> the core Java API related to SplittableDoFn.
> > > > >>>>
> > > > >>>> This mirrors a bunch of work that was done by Boyuan within the
> Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation
> we have in the Java SDK.
> > > > >>>>
> > > > >>>> 1: https://s.apache.org/splittable-do-fn
> > > > >>>> 2:
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > > > >>>> 3: https://github.com/apache/beam/pull/10992
> > > > >>>> 4: https://github.com/apache/beam/pull/9794
> > > > >>>> 5: https://github.com/apache/beam/pull/10375
>

Reply via email to