Can you confirm that the following implementation of trySplit will work as
intended (from an OffsetRangeTracker subclass)?
@Override
public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
return SplitResult.of(
new OffsetRange(currentRestriction().getFrom(), lastClaimedOffset +
1),
new OffsetRange(lastClaimedOffset + 1, Long.MAX_VALUE));
}
> It would nice to update the documentation if that's confusing.
If you could please update this (if this is indeed the case) to confirm
that it ensures that there will never be two unbounded restrictions sent to
DoFns running at the same time using this pattern, that would be great.
In addition, I'm not quite sure how this works? When the 'trySplit' call
occurs, it returns two OffsetRanges, which don't yet include information
about claimed offsets. How is the first half of this converted to my
OffsetRangeTracker subclass with offsets already claimed? Does the runtime
call (and importantly, is it required to call):
MyRestrictionTracker tracker = MyDoFn.newTracker(splitResult.getPrimary());
tracker.tryClaim(previousClaimed);
tracker.checkDone();
On Sat, Nov 28, 2020 at 2:08 AM Boyuan Zhang <[email protected]> wrote:
> It would nice to update the documentation if that's confusing.
>
> On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins <[email protected]>
> wrote:
>
>> I think the documentation for trySplit() doesn't make it clear that it
>> supports this use case. In particular this section:
>>
>> > This invocation updates the {@link #currentRestriction()} to be the
>> primary restriction effectively having the current {@link
>> DoFn.ProcessElement} execution responsible for performing the work that the
>> primary restriction represents. The residual restriction will be executed
>> in a separate {@link DoFn.ProcessElement} invocation (likely in a different
>> process). The work performed by executing the primary and residual
>> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be
>> equivalent to the work performed as if this split never occurred.
>>
>> Implies that the runner will try to run both restrictions again on
>> separate workers. This is not the behavior I am looking for, hence my
>> confusion. Can we change the documentation here to make clear that
>> checkDone will be called on the primary restriction in the output to ensure
>> that it is actually completed if the trySplit call was triggered by a call
>> to resume()?
>>
>> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang <[email protected]> wrote:
>>
>>> And if you look into the RestrictionTracker javadoc[1], it mentions that
>>> what means when you return null from trySplit.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107
>>>
>>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]>
>>> wrote:
>>>
>>>> To the extent I can see, this never mentions the restriction that you
>>>>> <need> to implement a split() that returns a bounded restriction if
>>>>> returning resume() from an SDF. Nor does this restriction particularly
>>>>> make
>>>>> sense if the range being processed is itself unbounded? Perhaps you would
>>>>> consider not calling checkDone() on resume() if the restriction provided
>>>>> to
>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>> unbounded restriction?
>>>>
>>>>
>>>> It seems like you are not familiar with how beam deals with resume() so
>>>> let's start from this part. Let's say that your SDF is processing a
>>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you
>>>> want to return resume() at this point. When you return resume() from here,
>>>> the beam Java DoFn invoker will know you want to resume and call your
>>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
>>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
>>>> your current restriction and [6, MAX) as the residual. Then beam Java DoFn
>>>> invoker will call checkDone on the restriction [0, 6) to double check your
>>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
>>>> will return the [6, Max) restriction back to runner(in your case that's
>>>> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
>>>> strategy. That's why if you want to use resume(), you need to implement
>>>> trySplit. That's also why trySplit and checkDone also make sense on
>>>> Unbounded restriction.
>>>>
>>>> Perhaps it would be better to explain in terms of why I'm trying to do
>>>>> this. If the subscription has not received any data in a while, or is
>>>>> receiving data infrequently, I want to enable dataflow to scale down to 1
>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>>>>> actually done with the data because, as new data can always be published
>>>>> in
>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>> unbounded output is much more natural.
>>>>
>>>>
>>>> So you are referring to the resume use case. Please note that even
>>>> though you are returning resume() from your SDF, that doesn't means
>>>> Dataflow will guarantee that the worker will be downscaled to 1. But
>>>> resume() indeed can help you free some workers to process other work,
>>>> compared to having your SDF doing busy wait.
>>>>
>>>> This is good to know. So to rephrase, could I periodically call
>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control back
>>>>> to the runtime?
>>>>
>>>>
>>>> You can do so by implementing RestrictionTracker.trySplit() and using
>>>> resume().
>>>>
>>>> You may also want to take a look at Kafka example[1]. Hope that is
>>>> helpful.
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
>>>>
>>>>
>>>>
>>>> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <[email protected]>
>>>> wrote:
>>>>
>>>>> > There is an update-to-date sdf programming guide[1] and typically
>>>>> [2] is talking about SDF initiated-checkpointing
>>>>>
>>>>> To the extent I can see, this never mentions the restriction that you
>>>>> <need> to implement a split() that returns a bounded restriction if
>>>>> returning resume() from an SDF. Nor does this restriction particularly
>>>>> make
>>>>> sense if the range being processed is itself unbounded? Perhaps you would
>>>>> consider not calling checkDone() on resume() if the restriction provided
>>>>> to
>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>> unbounded restriction?
>>>>>
>>>>> > It depends on the definition of no new data to enable rescheduling.
>>>>>
>>>>> Perhaps it would be better to explain in terms of why I'm trying to do
>>>>> this. If the subscription has not received any data in a while, or is
>>>>> receiving data infrequently, I want to enable dataflow to scale down to 1
>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>>>>> actually done with the data because, as new data can always be published
>>>>> in
>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>> unbounded output is much more natural.
>>>>>
>>>>> > Please note that the when an SDF is processing one element
>>>>> restriction pair, the start of the restriction is never changed. You will
>>>>> always get the same offset when you call currentRestriction().getFrom().
>>>>>
>>>>> This is good to know. So to rephrase, could I periodically call
>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control back
>>>>> to the runtime?
>>>>>
>>>>> -Dan
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> > Okay, is this documented anywhere? In particular,
>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>
>>>>>> There is an update-to-date sdf programming guide[1] and typically [2]
>>>>>> is talking about SDF initiated-checkpointing. And stop() should be
>>>>>> returned
>>>>>> when tryClaim returns false where resume() is expected to return when the
>>>>>> restriction is not fully processed and you want to defer processing in
>>>>>> the
>>>>>> future.
>>>>>>
>>>>>> > If this is the case, is there any way I can yield control to the
>>>>>> runtime if I have no new data to enable rescheduling? For example, can I
>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>
>>>>>> It depends on the definition of no new data to enable rescheduling.
>>>>>> If you believe that you are done with current restriction even though
>>>>>> you are not reaching to the end of restriction, you can specially say I'm
>>>>>> done with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or
>>>>>> tryClaim(restriction.getTo) if you are sure your end of restriction is
>>>>>> not
>>>>>> changed by any splitting).
>>>>>> If you just want to re-process the rest of the restriction after a
>>>>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the
>>>>>> trySplit and return resume(duration) when you want to resume.
>>>>>>
>>>>>> Please note that the when an SDF is processing one element
>>>>>> restriction pair, the start of the restriction is never changed. You will
>>>>>> always get the same offset when you call currentRestriction().getFrom().
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>>>> [2]
>>>>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint
>>>>>>
>>>>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> > you can either never return resume() from your SDF or implement
>>>>>>> suitable trySplit() logic for your RestrictionTracker
>>>>>>>
>>>>>>> Okay, is this documented anywhere? In particular,
>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>>
>>>>>>> If this is the case, is there any way I can yield control to the
>>>>>>> runtime if I have no new data to enable rescheduling? For example, can I
>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>>
>>>>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > IIUC, this should never happen as long as I return null to
>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>
>>>>>>>> I noticed that in your implementation you return null for your
>>>>>>>> RestrictionTracker.trySplit. That means you cannot return resume() from
>>>>>>>> your SplittableDoFn.process() body since resume() means
>>>>>>>> performing SplittableDoFn self-initiated checkpointing and
>>>>>>>> deferring processing residuals.
>>>>>>>>
>>>>>>>> In your case, you can either never return resume() from your SDF or
>>>>>>>> implement suitable trySplit() logic for your RestrictionTracker. For
>>>>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
>>>>>>>> infinite restriction.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>>>>>>>>
>>>>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> > Please note that your current restriction might be changed to a
>>>>>>>>> finite restriction during processing one bundle if you do
>>>>>>>>> SplittableDoFn
>>>>>>>>> self-initiated checkpointing or any runner issued splits
>>>>>>>>>
>>>>>>>>> IIUC, this should never happen as long as I return null to
>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public SplitResult<OffsetRange> trySplit(double
>>>>>>>>> fractionOfRemainder) { return null; }
>>>>>>>>>
>>>>>>>>> > what will you do if we reach the finally block?
>>>>>>>>>
>>>>>>>>> At that point an exception is being thrown out of the
>>>>>>>>> processElement function. The answer to that would be "what will the
>>>>>>>>> runtime
>>>>>>>>> do if an exception is thrown out of the processElement function"
>>>>>>>>>
>>>>>>>>> > Open a WIP PR
>>>>>>>>>
>>>>>>>>> I have, but I'm staging changes in a separate repo. See
>>>>>>>>> https://github.com/googleapis/java-pubsublite/pull/390 (although
>>>>>>>>> this incorporates other changes, see PubsubLitePartitionSdf.java
>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>>>>>>>> and PubsubLiteOffsetRangeTracker.java
>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>>>>>>>> for
>>>>>>>>> the sdf and restriction tracker implementations)
>>>>>>>>>
>>>>>>>>> -Dan
>>>>>>>>>
>>>>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Boyuan,
>>>>>>>>>>>
>>>>>>>>>>> Responses inline.
>>>>>>>>>>>
>>>>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>>>>> when you exit you SplittableDoFn.process you must have completed
>>>>>>>>>>> all the
>>>>>>>>>>> work in the current restriction
>>>>>>>>>>>
>>>>>>>>>>> This is impossible with unbounded restrictions since, by
>>>>>>>>>>> definition, all work cannot be completed.
>>>>>>>>>>>
>>>>>>>>>> Please note that your current restriction might be changed to a
>>>>>>>>>> finite restriction during processing one bundle if you do
>>>>>>>>>> SplittableDoFn
>>>>>>>>>> self-initiated checkpointing or any runner issued splits.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> > In your case, it seems like after you do tryClaim(3188439),
>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process function
>>>>>>>>>>>
>>>>>>>>>>> This is not true. The code in question is below. stop() is only
>>>>>>>>>>> returned if tryClaim returns false.
>>>>>>>>>>>
>>>>>>>>>>> -Dan
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>>>>>>>>>>
>>>>>>>>>> logger.atWarning().log("Failed to claim initial restriction
>>>>>>>>>>> for partition " + partition);
>>>>>>>>>>> return ProcessContinuation.stop();
>>>>>>>>>>> }
>>>>>>>>>>> sleepTimeRemaining = maxSleepTime;
>>>>>>>>>>> Committer committer = committerSupplier.apply(partition);
>>>>>>>>>>> committer.startAsync().awaitRunning();
>>>>>>>>>>> try (PullSubscriber<SequencedMessage> subscriber =
>>>>>>>>>>> subscriberFactory.apply(partition,
>>>>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) {
>>>>>>>>>>> while (true) {
>>>>>>>>>>> List<SequencedMessage> messages = doPoll(subscriber);
>>>>>>>>>>> // We polled for as long as possible, yield to the
>>>>>>>>>>> runtime to allow it to reschedule us on
>>>>>>>>>>> // a new task.
>>>>>>>>>>> if (messages.isEmpty()) {
>>>>>>>>>>> logger.atWarning().log("Yielding due to timeout on
>>>>>>>>>>> partition " + partition);
>>>>>>>>>>> return ProcessContinuation.resume();
>>>>>>>>>>> }
>>>>>>>>>>> long lastOffset =
>>>>>>>>>>> Iterables.getLast(messages).offset().value();
>>>>>>>>>>> if (tracker.tryClaim(lastOffset)) {
>>>>>>>>>>> messages.forEach(
>>>>>>>>>>> message ->
>>>>>>>>>>> receiver.outputWithTimestamp(
>>>>>>>>>>> message, new
>>>>>>>>>>> Instant(Timestamps.toMillis(message.publishTime()))));
>>>>>>>>>>> committer.commitOffset(Offset.of(lastOffset +
>>>>>>>>>>> 1)).get();
>>>>>>>>>>> } else {
>>>>>>>>>>> logger.atWarning().log("Stopping partition " +
>>>>>>>>>>> partition);
>>>>>>>>>>> return ProcessContinuation.stop();
>>>>>>>>>>> }
>>>>>>>>>>> }
>>>>>>>>>>> } finally {
>>>>>>>>>>> committer.stopAsync().awaitTerminated();
>>>>>>>>>>> }
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>> From your code, what will you do if we reach the finally block?
>>>>>>>>>> Would you like to open a WIP PR to show more details?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Daniel,
>>>>>>>>>>>>
>>>>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>>>>>> when you exit you SplittableDoFn.process(either you return stop()
>>>>>>>>>>>> or
>>>>>>>>>>>> resume()), you must have completed all the work in the current
>>>>>>>>>>>> restriction.
>>>>>>>>>>>> This is one of major ways for SplittableDoFn to prevent data loss.
>>>>>>>>>>>>
>>>>>>>>>>>> In your case, it seems like after you do tryClaim(3188439), you
>>>>>>>>>>>> return stop() directly from your SplittableDoFn.process function.
>>>>>>>>>>>> That's
>>>>>>>>>>>> not a correct way when working with restriction and restriction
>>>>>>>>>>>> tracker.
>>>>>>>>>>>> You should either return resume() to perform SplittableDoFn
>>>>>>>>>>>> initiated
>>>>>>>>>>>> checkpoint to defer processing restriction [3188439,
>>>>>>>>>>>> 9223372036854775807),
>>>>>>>>>>>> or you should return stop() only when you have tryClaim() return
>>>>>>>>>>>> False.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running
>>>>>>>>>>>>> into the following error on dataflow with a RestrictionTracker
>>>>>>>>>>>>> returning
>>>>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to
>>>>>>>>>>>>> `checkDone`,
>>>>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think
>>>>>>>>>>>>> there's any
>>>>>>>>>>>>> rational thing I can do in this case. Does anyone know what
>>>>>>>>>>>>> should be done
>>>>>>>>>>>>> for this method?
>>>>>>>>>>>>>
>>>>>>>>>>>>> The following exist in the RestrictionTracker javadoc:
>>>>>>>>>>>>> -Must throw an exception with an informative error message, if
>>>>>>>>>>>>> there is still any unclaimed work remaining in the restriction.
>>>>>>>>>>>>> (there is,
>>>>>>>>>>>>> the restriction is unbounded)
>>>>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dan
>>>>>>>>>>>>>
>>>>>>>>>>>>> "Error message from worker: java.lang.IllegalStateException:
>>>>>>>>>>>>> Last attempted offset was 3188438 in range [1998348,
>>>>>>>>>>>>> 9223372036854775807),
>>>>>>>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>>>>>>>>>>
>>>>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>>>>>>>>>>
>>>>>>>>>>>>