It would nice to update the documentation if that's confusing.
On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins <[email protected]>
wrote:
> I think the documentation for trySplit() doesn't make it clear that it
> supports this use case. In particular this section:
>
> > This invocation updates the {@link #currentRestriction()} to be the
> primary restriction effectively having the current {@link
> DoFn.ProcessElement} execution responsible for performing the work that the
> primary restriction represents. The residual restriction will be executed
> in a separate {@link DoFn.ProcessElement} invocation (likely in a different
> process). The work performed by executing the primary and residual
> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be
> equivalent to the work performed as if this split never occurred.
>
> Implies that the runner will try to run both restrictions again on
> separate workers. This is not the behavior I am looking for, hence my
> confusion. Can we change the documentation here to make clear that
> checkDone will be called on the primary restriction in the output to ensure
> that it is actually completed if the trySplit call was triggered by a call
> to resume()?
>
> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang <[email protected]> wrote:
>
>> And if you look into the RestrictionTracker javadoc[1], it mentions that
>> what means when you return null from trySplit.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107
>>
>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]> wrote:
>>
>>> To the extent I can see, this never mentions the restriction that you
>>>> <need> to implement a split() that returns a bounded restriction if
>>>> returning resume() from an SDF. Nor does this restriction particularly make
>>>> sense if the range being processed is itself unbounded? Perhaps you would
>>>> consider not calling checkDone() on resume() if the restriction provided to
>>>> the runner is unbounded since it would be unreasonable to complete an
>>>> unbounded restriction?
>>>
>>>
>>> It seems like you are not familiar with how beam deals with resume() so
>>> let's start from this part. Let's say that your SDF is processing a
>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you
>>> want to return resume() at this point. When you return resume() from here,
>>> the beam Java DoFn invoker will know you want to resume and call your
>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
>>> your current restriction and [6, MAX) as the residual. Then beam Java DoFn
>>> invoker will call checkDone on the restriction [0, 6) to double check your
>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
>>> will return the [6, Max) restriction back to runner(in your case that's
>>> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
>>> strategy. That's why if you want to use resume(), you need to implement
>>> trySplit. That's also why trySplit and checkDone also make sense on
>>> Unbounded restriction.
>>>
>>> Perhaps it would be better to explain in terms of why I'm trying to do
>>>> this. If the subscription has not received any data in a while, or is
>>>> receiving data infrequently, I want to enable dataflow to scale down to 1
>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>>>> actually done with the data because, as new data can always be published in
>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>> implement bounded reads to artificially produce sub-windows when an
>>>> unbounded output is much more natural.
>>>
>>>
>>> So you are referring to the resume use case. Please note that even
>>> though you are returning resume() from your SDF, that doesn't means
>>> Dataflow will guarantee that the worker will be downscaled to 1. But
>>> resume() indeed can help you free some workers to process other work,
>>> compared to having your SDF doing busy wait.
>>>
>>> This is good to know. So to rephrase, could I periodically call
>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control back
>>>> to the runtime?
>>>
>>>
>>> You can do so by implementing RestrictionTracker.trySplit() and using
>>> resume().
>>>
>>> You may also want to take a look at Kafka example[1]. Hope that is
>>> helpful.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
>>>
>>>
>>>
>>> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <[email protected]>
>>> wrote:
>>>
>>>> > There is an update-to-date sdf programming guide[1] and typically [2]
>>>> is talking about SDF initiated-checkpointing
>>>>
>>>> To the extent I can see, this never mentions the restriction that you
>>>> <need> to implement a split() that returns a bounded restriction if
>>>> returning resume() from an SDF. Nor does this restriction particularly make
>>>> sense if the range being processed is itself unbounded? Perhaps you would
>>>> consider not calling checkDone() on resume() if the restriction provided to
>>>> the runner is unbounded since it would be unreasonable to complete an
>>>> unbounded restriction?
>>>>
>>>> > It depends on the definition of no new data to enable rescheduling.
>>>>
>>>> Perhaps it would be better to explain in terms of why I'm trying to do
>>>> this. If the subscription has not received any data in a while, or is
>>>> receiving data infrequently, I want to enable dataflow to scale down to 1
>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>>>> actually done with the data because, as new data can always be published in
>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>> implement bounded reads to artificially produce sub-windows when an
>>>> unbounded output is much more natural.
>>>>
>>>> > Please note that the when an SDF is processing one element
>>>> restriction pair, the start of the restriction is never changed. You will
>>>> always get the same offset when you call currentRestriction().getFrom().
>>>>
>>>> This is good to know. So to rephrase, could I periodically call
>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control back
>>>> to the runtime?
>>>>
>>>> -Dan
>>>>
>>>>
>>>>
>>>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> > Okay, is this documented anywhere? In particular,
>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>
>>>>> There is an update-to-date sdf programming guide[1] and typically [2]
>>>>> is talking about SDF initiated-checkpointing. And stop() should be
>>>>> returned
>>>>> when tryClaim returns false where resume() is expected to return when the
>>>>> restriction is not fully processed and you want to defer processing in the
>>>>> future.
>>>>>
>>>>> > If this is the case, is there any way I can yield control to the
>>>>> runtime if I have no new data to enable rescheduling? For example, can I
>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>
>>>>> It depends on the definition of no new data to enable rescheduling.
>>>>> If you believe that you are done with current restriction even though
>>>>> you are not reaching to the end of restriction, you can specially say I'm
>>>>> done with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or
>>>>> tryClaim(restriction.getTo) if you are sure your end of restriction is not
>>>>> changed by any splitting).
>>>>> If you just want to re-process the rest of the restriction after a
>>>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the
>>>>> trySplit and return resume(duration) when you want to resume.
>>>>>
>>>>> Please note that the when an SDF is processing one element restriction
>>>>> pair, the start of the restriction is never changed. You will always get
>>>>> the same offset when you call currentRestriction().getFrom().
>>>>>
>>>>>
>>>>> [1]
>>>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>>> [2]
>>>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint
>>>>>
>>>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> > you can either never return resume() from your SDF or implement
>>>>>> suitable trySplit() logic for your RestrictionTracker
>>>>>>
>>>>>> Okay, is this documented anywhere? In particular,
>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>
>>>>>> If this is the case, is there any way I can yield control to the
>>>>>> runtime if I have no new data to enable rescheduling? For example, can I
>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>
>>>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> > IIUC, this should never happen as long as I return null to
>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>
>>>>>>> I noticed that in your implementation you return null for your
>>>>>>> RestrictionTracker.trySplit. That means you cannot return resume() from
>>>>>>> your SplittableDoFn.process() body since resume() means
>>>>>>> performing SplittableDoFn self-initiated checkpointing and
>>>>>>> deferring processing residuals.
>>>>>>>
>>>>>>> In your case, you can either never return resume() from your SDF or
>>>>>>> implement suitable trySplit() logic for your RestrictionTracker. For
>>>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
>>>>>>> infinite restriction.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>>>>>>>
>>>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > Please note that your current restriction might be changed to a
>>>>>>>> finite restriction during processing one bundle if you do
>>>>>>>> SplittableDoFn
>>>>>>>> self-initiated checkpointing or any runner issued splits
>>>>>>>>
>>>>>>>> IIUC, this should never happen as long as I return null to
>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public SplitResult<OffsetRange> trySplit(double
>>>>>>>> fractionOfRemainder) { return null; }
>>>>>>>>
>>>>>>>> > what will you do if we reach the finally block?
>>>>>>>>
>>>>>>>> At that point an exception is being thrown out of the
>>>>>>>> processElement function. The answer to that would be "what will the
>>>>>>>> runtime
>>>>>>>> do if an exception is thrown out of the processElement function"
>>>>>>>>
>>>>>>>> > Open a WIP PR
>>>>>>>>
>>>>>>>> I have, but I'm staging changes in a separate repo. See
>>>>>>>> https://github.com/googleapis/java-pubsublite/pull/390 (although
>>>>>>>> this incorporates other changes, see PubsubLitePartitionSdf.java
>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>>>>>>> and PubsubLiteOffsetRangeTracker.java
>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>>>>>>> for
>>>>>>>> the sdf and restriction tracker implementations)
>>>>>>>>
>>>>>>>> -Dan
>>>>>>>>
>>>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Boyuan,
>>>>>>>>>>
>>>>>>>>>> Responses inline.
>>>>>>>>>>
>>>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>>>> when you exit you SplittableDoFn.process you must have completed all
>>>>>>>>>> the
>>>>>>>>>> work in the current restriction
>>>>>>>>>>
>>>>>>>>>> This is impossible with unbounded restrictions since, by
>>>>>>>>>> definition, all work cannot be completed.
>>>>>>>>>>
>>>>>>>>> Please note that your current restriction might be changed to a
>>>>>>>>> finite restriction during processing one bundle if you do
>>>>>>>>> SplittableDoFn
>>>>>>>>> self-initiated checkpointing or any runner issued splits.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> > In your case, it seems like after you do tryClaim(3188439), you
>>>>>>>>>> return stop() directly from your SplittableDoFn.process function
>>>>>>>>>>
>>>>>>>>>> This is not true. The code in question is below. stop() is only
>>>>>>>>>> returned if tryClaim returns false.
>>>>>>>>>>
>>>>>>>>>> -Dan
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>>>>>>>>>
>>>>>>>>> logger.atWarning().log("Failed to claim initial restriction
>>>>>>>>>> for partition " + partition);
>>>>>>>>>> return ProcessContinuation.stop();
>>>>>>>>>> }
>>>>>>>>>> sleepTimeRemaining = maxSleepTime;
>>>>>>>>>> Committer committer = committerSupplier.apply(partition);
>>>>>>>>>> committer.startAsync().awaitRunning();
>>>>>>>>>> try (PullSubscriber<SequencedMessage> subscriber =
>>>>>>>>>> subscriberFactory.apply(partition,
>>>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) {
>>>>>>>>>> while (true) {
>>>>>>>>>> List<SequencedMessage> messages = doPoll(subscriber);
>>>>>>>>>> // We polled for as long as possible, yield to the
>>>>>>>>>> runtime to allow it to reschedule us on
>>>>>>>>>> // a new task.
>>>>>>>>>> if (messages.isEmpty()) {
>>>>>>>>>> logger.atWarning().log("Yielding due to timeout on
>>>>>>>>>> partition " + partition);
>>>>>>>>>> return ProcessContinuation.resume();
>>>>>>>>>> }
>>>>>>>>>> long lastOffset =
>>>>>>>>>> Iterables.getLast(messages).offset().value();
>>>>>>>>>> if (tracker.tryClaim(lastOffset)) {
>>>>>>>>>> messages.forEach(
>>>>>>>>>> message ->
>>>>>>>>>> receiver.outputWithTimestamp(
>>>>>>>>>> message, new
>>>>>>>>>> Instant(Timestamps.toMillis(message.publishTime()))));
>>>>>>>>>> committer.commitOffset(Offset.of(lastOffset + 1)).get();
>>>>>>>>>> } else {
>>>>>>>>>> logger.atWarning().log("Stopping partition " +
>>>>>>>>>> partition);
>>>>>>>>>> return ProcessContinuation.stop();
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>> } finally {
>>>>>>>>>> committer.stopAsync().awaitTerminated();
>>>>>>>>>> }
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>> From your code, what will you do if we reach the finally block?
>>>>>>>>> Would you like to open a WIP PR to show more details?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Daniel,
>>>>>>>>>>>
>>>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>>>>> when you exit you SplittableDoFn.process(either you return stop() or
>>>>>>>>>>> resume()), you must have completed all the work in the current
>>>>>>>>>>> restriction.
>>>>>>>>>>> This is one of major ways for SplittableDoFn to prevent data loss.
>>>>>>>>>>>
>>>>>>>>>>> In your case, it seems like after you do tryClaim(3188439), you
>>>>>>>>>>> return stop() directly from your SplittableDoFn.process function.
>>>>>>>>>>> That's
>>>>>>>>>>> not a correct way when working with restriction and restriction
>>>>>>>>>>> tracker.
>>>>>>>>>>> You should either return resume() to perform SplittableDoFn
>>>>>>>>>>> initiated
>>>>>>>>>>> checkpoint to defer processing restriction [3188439,
>>>>>>>>>>> 9223372036854775807),
>>>>>>>>>>> or you should return stop() only when you have tryClaim() return
>>>>>>>>>>> False.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running
>>>>>>>>>>>> into the following error on dataflow with a RestrictionTracker
>>>>>>>>>>>> returning
>>>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to
>>>>>>>>>>>> `checkDone`,
>>>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think
>>>>>>>>>>>> there's any
>>>>>>>>>>>> rational thing I can do in this case. Does anyone know what should
>>>>>>>>>>>> be done
>>>>>>>>>>>> for this method?
>>>>>>>>>>>>
>>>>>>>>>>>> The following exist in the RestrictionTracker javadoc:
>>>>>>>>>>>> -Must throw an exception with an informative error message, if
>>>>>>>>>>>> there is still any unclaimed work remaining in the restriction.
>>>>>>>>>>>> (there is,
>>>>>>>>>>>> the restriction is unbounded)
>>>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>
>>>>>>>>>>>> Dan
>>>>>>>>>>>>
>>>>>>>>>>>> "Error message from worker: java.lang.IllegalStateException:
>>>>>>>>>>>> Last attempted offset was 3188438 in range [1998348,
>>>>>>>>>>>> 9223372036854775807),
>>>>>>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>>>>>>>>>
>>>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>>>>>>>>>
>>>>>>>>>>>