I don't have a strong preference for using a provider/having a set of
tightly coupled methods in Java, other than that we be consistent (and
we already use the methods style for restrictions).

On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
>
> Jan, there are some parts of Apache Beam the watermarks package will likely 
> rely on (@Experimental annotation, javadoc links) but fundamentally should 
> not rely on core and someone could create a separate package for this.

I think it does make sense for a set of common watermark trackers to
be shipped with core (e.g. manual, monotonic, and eventually a
probabilistic one).

> Ismael, the unification of bounded/unbounded within SplittableDoFn has always 
> been a goal. There are a set of features that BoundedSources are unlikely to 
> use but would still be allowed to use them. For example, bounded sources may 
> want to have support for checkpointing since I could foresee an BoundedSource 
> that can notice that a certain resource becomes unavailable and can only 
> process it later. The choice of which watermark estimator to use is a likely 
> point of difference between bounded and unbounded SDFs since bounded SDFs 
> would be able to use a very simple estimator where the watermark is held at 
> -infinity and only advances to +infinity once there is no more data to 
> process. But even though unbounded SDFs are likely to be the most common 
> users of varied watermark estimators, a bounded SDF may still want to advance 
> the watermark as they read records so that runners that are more "streaming" 
> (for example micro batch) could process the entire pipeline in parallel vs 
> other runners that execute one whole segment of the pipeline at a time.

Put another way, the value of watermark trackers is to allow
processing to continue downstream before the source has completed
reading. This is of course essential for streaming, but If the source
is read to completion before downstream stages start (as is the case
for most batch runners) it is not needed. What this unification does
allow, however, is a source to be written in such a way that can be
efficiently used in both batch and streaming mode.

> Currently the watermark that is reported as part of the PollResult is passed 
> to the ProcessContext.updateWatermark [1, 2] function and instead that call 
> would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
>
> 1: 
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> 2: 
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> 3: 
> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
>
> On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>
>> I just realized that the HBaseIO example is not a good one because we can
>> already have Watch like behavior as we do for Partition discovery in 
>> HCatalogIO.
>> Still I am interested on your views on bounded/unbounded unification.
>>
>> Interesting question2: How this will annotations connect with the Watch
>> transform Polling patterns?
>> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
>>
>>
>> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>
>>> Really interesting! Implementing correctly the watermark has been a common
>>> struggle for IO authors, to the point that some IOs still have issues around
>>> that. So +1 for this, in particular if we can get to reuse common patterns.
>>> I was not aware of Boyuan's work around this, really nice.
>>>
>>> One aspect I have always being confused about since I read the SDF proposal
>>> documents is if we could get to have a single API for both Bounded and 
>>> Unbounded
>>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special 
>>> case.
>>> Could WatermarkEstimator help in this direction?
>>>
>>> One quick case that I can think is to make the current HBaseIO SDF to work 
>>> in an
>>> unbounded manner, for example to 'watch and read new tables'.
>>>
>>>
>>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
>>>>
>>>> To support watermark reporting within the Java SDK for SplittableDoFns, we 
>>>> need a way to have SDF authors to report watermark estimates over the 
>>>> element and restriction pair that they are processing.
>>>>
>>>> For UnboundedSources, it was found to be a pain point to ask each SDF 
>>>> author to write their own watermark estimation which typically prevented 
>>>> re-use. Therefore we would like to have a "library" of watermark 
>>>> estimators that help SDF authors perform this estimation similar to how 
>>>> there is a "library" of restrictions and restriction trackers that SDF 
>>>> authors can use. For SDF authors where the existing library doesn't work, 
>>>> they can add additional ones that observe timestamps of elements or choose 
>>>> to directly report the watermark through a "ManualWatermarkEstimator" 
>>>> parameter that can be supplied to @ProcessElement methods.
>>>>
>>>> The public facing portion of the DoFn changes adds three new annotations 
>>>> for new DoFn style methods:
>>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state, 
>>>> similar to GetInitialRestriction
>>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark 
>>>> state type, similar to GetRestrictionCoder for restrictions returned by 
>>>> GetInitialRestriction.
>>>> NewWatermarkEstimator: Returns a watermark estimator that either the 
>>>> framework invokes allowing it to observe the timestamps of output records 
>>>> or a manual watermark estimator that can be explicitly invoked to update 
>>>> the watermark.
>>>>
>>>> See [3] for an initial PR with the public facing additions to the core 
>>>> Java API related to SplittableDoFn.
>>>>
>>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK 
>>>> [4, 5] but in the style of new DoFn parameter/method invocation we have in 
>>>> the Java SDK.
>>>>
>>>> 1: https://s.apache.org/splittable-do-fn
>>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>>>> 3: https://github.com/apache/beam/pull/10992
>>>> 4: https://github.com/apache/beam/pull/9794
>>>> 5: https://github.com/apache/beam/pull/10375

Reply via email to