You need to update your current restriction when you do trySplit. That
means your current restriction should be the primary of the split result.
Please refer to existing OffsetRangeTracker implementation[1] for more
details.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L81

On Fri, Nov 27, 2020 at 11:48 PM Daniel Collins <[email protected]>
wrote:

> Ah, I appear to have missed this line, which indicates that the current
> RestrictionTracker must be updated.
>
> This invocation updates the {@link
>    * #currentRestriction()} to be the primary restriction effectively
> having the current {@link
>    * DoFn.ProcessElement} execution responsible for performing the work
> that the primary restriction
>    * represents.
>
> On Sat, Nov 28, 2020 at 2:45 AM Daniel Collins <[email protected]>
> wrote:
>
>> This does not appear to work: The CheckDone call, as far as I can tell,
>> is made on the existing range not the split range based on the following
>> error:
>>
>> Error message from worker: java.lang.IllegalStateException: Last
>> attempted offset was 4601978 in range [2975759, 9223372036854775807),
>> claiming work in [4601979, 9223372036854775807) was not attempted
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>
>> On Sat, Nov 28, 2020 at 2:18 AM Daniel Collins <[email protected]>
>> wrote:
>>
>>> Can you confirm that the following implementation of trySplit will work
>>> as intended (from an OffsetRangeTracker subclass)?
>>>
>>> @Override
>>>   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
>>>     return SplitResult.of(
>>>         new OffsetRange(currentRestriction().getFrom(),
>>> lastClaimedOffset + 1),
>>>         new OffsetRange(lastClaimedOffset + 1, Long.MAX_VALUE));
>>>   }
>>>
>>> > It would nice to update the documentation if that's confusing.
>>>
>>> If you could please update this (if this is indeed the case) to confirm
>>> that it ensures that there will never be two unbounded restrictions sent to
>>> DoFns running at the same time using this pattern, that would be great.
>>>
>>> In addition, I'm not quite sure how this works? When the 'trySplit' call
>>> occurs, it returns two OffsetRanges, which don't yet include information
>>> about claimed offsets. How is the first half of this converted to my
>>> OffsetRangeTracker subclass with offsets already claimed? Does the runtime
>>> call (and importantly, is it required to call):
>>>
>>> MyRestrictionTracker tracker =
>>> MyDoFn.newTracker(splitResult.getPrimary());
>>> tracker.tryClaim(previousClaimed);
>>> tracker.checkDone();
>>>
>>> On Sat, Nov 28, 2020 at 2:08 AM Boyuan Zhang <[email protected]> wrote:
>>>
>>>> It would nice to update the documentation if that's confusing.
>>>>
>>>> On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins <[email protected]>
>>>> wrote:
>>>>
>>>>> I think the documentation for trySplit() doesn't make it clear that it
>>>>> supports this use case. In particular this section:
>>>>>
>>>>> > This invocation updates the {@link #currentRestriction()} to be the
>>>>> primary restriction effectively having the current {@link
>>>>> DoFn.ProcessElement} execution responsible for performing the work that 
>>>>> the
>>>>> primary restriction represents. The residual restriction will be executed
>>>>> in a separate {@link DoFn.ProcessElement} invocation (likely in a 
>>>>> different
>>>>> process). The work performed by executing the primary and residual
>>>>> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be
>>>>> equivalent to the work performed as if this split never occurred.
>>>>>
>>>>> Implies that the runner will try to run both restrictions again on
>>>>> separate workers. This is not the behavior I am looking for, hence my
>>>>> confusion. Can we change the documentation here to make clear that
>>>>> checkDone will be called on the primary restriction in the output to 
>>>>> ensure
>>>>> that it is actually completed if the trySplit call was triggered by a call
>>>>> to resume()?
>>>>>
>>>>> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> And if you look into the RestrictionTracker javadoc[1], it mentions
>>>>>> that what means when you return null from trySplit.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107
>>>>>>
>>>>>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> To the extent I can see, this never mentions the restriction that
>>>>>>>> you <need> to implement a split() that returns a bounded restriction if
>>>>>>>> returning resume() from an SDF. Nor does this restriction particularly 
>>>>>>>> make
>>>>>>>> sense if the range being processed is itself unbounded? Perhaps you 
>>>>>>>> would
>>>>>>>> consider not calling checkDone() on resume() if the restriction 
>>>>>>>> provided to
>>>>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>>>>> unbounded restriction?
>>>>>>>
>>>>>>>
>>>>>>> It seems like you are not familiar with how beam deals with resume()
>>>>>>> so let's start from this part. Let's say that your SDF is processing a
>>>>>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you
>>>>>>> want to return resume() at this point. When you return resume() from 
>>>>>>> here,
>>>>>>> the beam Java DoFn invoker will know you want to resume and call your
>>>>>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
>>>>>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
>>>>>>> your current restriction and [6, MAX) as the residual. Then beam Java 
>>>>>>> DoFn
>>>>>>> invoker will call checkDone on the restriction [0, 6) to double check 
>>>>>>> your
>>>>>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
>>>>>>> will return the [6, Max) restriction back to runner(in your case that's
>>>>>>> Dataflow) and the runner will reschedule [6, MAX) based on its 
>>>>>>> scheduling
>>>>>>> strategy. That's why if you want to use resume(), you need to implement
>>>>>>> trySplit. That's also why trySplit and checkDone also make sense on
>>>>>>> Unbounded restriction.
>>>>>>>
>>>>>>> Perhaps it would be better to explain in terms of why I'm trying to
>>>>>>>> do this. If the subscription has not received any data in a while, or 
>>>>>>>> is
>>>>>>>> receiving data infrequently, I want to enable dataflow to scale down 
>>>>>>>> to 1
>>>>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm 
>>>>>>>> not
>>>>>>>> actually done with the data because, as new data can always be 
>>>>>>>> published in
>>>>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>>>>> unbounded output is much more natural.
>>>>>>>
>>>>>>>
>>>>>>> So you are referring to the resume use case. Please note that even
>>>>>>> though you are returning resume() from your SDF, that doesn't means
>>>>>>> Dataflow will guarantee that the worker will be downscaled to 1. But
>>>>>>> resume() indeed can help you free some workers to process other work,
>>>>>>> compared to having your SDF doing busy wait.
>>>>>>>
>>>>>>> This is good to know. So to rephrase, could I periodically call
>>>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control 
>>>>>>>> back
>>>>>>>> to the runtime?
>>>>>>>
>>>>>>>
>>>>>>> You can do so by implementing RestrictionTracker.trySplit() and
>>>>>>> using resume().
>>>>>>>
>>>>>>> You may also want to take a look at Kafka example[1]. Hope that is
>>>>>>> helpful.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> > There is an update-to-date sdf programming guide[1] and typically
>>>>>>>> [2] is talking about SDF initiated-checkpointing
>>>>>>>>
>>>>>>>> To the extent I can see, this never mentions the restriction that
>>>>>>>> you <need> to implement a split() that returns a bounded restriction if
>>>>>>>> returning resume() from an SDF. Nor does this restriction particularly 
>>>>>>>> make
>>>>>>>> sense if the range being processed is itself unbounded? Perhaps you 
>>>>>>>> would
>>>>>>>> consider not calling checkDone() on resume() if the restriction 
>>>>>>>> provided to
>>>>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>>>>> unbounded restriction?
>>>>>>>>
>>>>>>>> > It depends on the definition of no new data to enable
>>>>>>>> rescheduling.
>>>>>>>>
>>>>>>>> Perhaps it would be better to explain in terms of why I'm trying to
>>>>>>>> do this. If the subscription has not received any data in a while, or 
>>>>>>>> is
>>>>>>>> receiving data infrequently, I want to enable dataflow to scale down 
>>>>>>>> to 1
>>>>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm 
>>>>>>>> not
>>>>>>>> actually done with the data because, as new data can always be 
>>>>>>>> published in
>>>>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>>>>> unbounded output is much more natural.
>>>>>>>>
>>>>>>>> > Please note that the when an SDF is processing one element
>>>>>>>> restriction pair, the start of the restriction is never changed. You 
>>>>>>>> will
>>>>>>>> always get the same offset when you call 
>>>>>>>> currentRestriction().getFrom().
>>>>>>>>
>>>>>>>> This is good to know. So to rephrase, could I periodically call
>>>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control 
>>>>>>>> back
>>>>>>>> to the runtime?
>>>>>>>>
>>>>>>>> -Dan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> > Okay, is this documented anywhere? In particular,
>>>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>>>>
>>>>>>>>> There is an update-to-date sdf programming guide[1] and typically
>>>>>>>>> [2] is talking about SDF initiated-checkpointing. And stop() should be
>>>>>>>>> returned when tryClaim returns false where resume() is expected to 
>>>>>>>>> return
>>>>>>>>> when the restriction is not fully processed and you want to defer
>>>>>>>>> processing in the future.
>>>>>>>>>
>>>>>>>>> > If this is the case, is there any way I can yield control to the
>>>>>>>>> runtime if I have no new data to enable rescheduling? For example, 
>>>>>>>>> can I
>>>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>>>>
>>>>>>>>> It depends on the definition of no new data to enable
>>>>>>>>> rescheduling.
>>>>>>>>> If you believe that you are done with current restriction even
>>>>>>>>> though you are not reaching to the end of restriction, you can 
>>>>>>>>> specially
>>>>>>>>> say I'm done with current one by calling
>>>>>>>>> restrictionTracker.tryClaim(MAX_LONG)(or tryClaim(restriction.getTo) 
>>>>>>>>> if you
>>>>>>>>> are sure your end of restriction is not changed by any splitting).
>>>>>>>>> If you just want to re-process the rest of the restriction after a
>>>>>>>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the
>>>>>>>>> trySplit and return resume(duration) when you want to resume.
>>>>>>>>>
>>>>>>>>> Please note that the when an SDF is processing one element
>>>>>>>>> restriction pair, the start of the restriction is never changed. You 
>>>>>>>>> will
>>>>>>>>> always get the same offset when you call 
>>>>>>>>> currentRestriction().getFrom().
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>>>>>>> [2]
>>>>>>>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint
>>>>>>>>>
>>>>>>>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> > you can either never return resume() from your SDF or implement
>>>>>>>>>> suitable trySplit() logic for your RestrictionTracker
>>>>>>>>>>
>>>>>>>>>> Okay, is this documented anywhere? In particular,
>>>>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since
>>>>>>>>>> it implies resume() should be returned when tryClaim returns false.
>>>>>>>>>>
>>>>>>>>>> If this is the case, is there any way I can yield control to the
>>>>>>>>>> runtime if I have no new data to enable rescheduling? For example, 
>>>>>>>>>> can I
>>>>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>>>>>
>>>>>>>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> > IIUC, this should never happen as long as I return null to
>>>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>>>>
>>>>>>>>>>> I noticed that in your implementation you return null for your
>>>>>>>>>>> RestrictionTracker.trySplit. That means you cannot return resume() 
>>>>>>>>>>> from
>>>>>>>>>>> your SplittableDoFn.process() body since resume() means
>>>>>>>>>>> performing SplittableDoFn self-initiated checkpointing and
>>>>>>>>>>> deferring processing residuals.
>>>>>>>>>>>
>>>>>>>>>>> In your case, you can either never return resume() from your SDF
>>>>>>>>>>> or implement suitable trySplit() logic for your RestrictionTracker. 
>>>>>>>>>>> For
>>>>>>>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
>>>>>>>>>>> infinite restriction.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> > Please note that your current restriction might be changed to
>>>>>>>>>>>> a finite restriction during processing one bundle if you do 
>>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>>> self-initiated checkpointing or any runner issued splits
>>>>>>>>>>>>
>>>>>>>>>>>> IIUC, this should never happen as long as I return null to
>>>>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>>   public SplitResult<OffsetRange> trySplit(double
>>>>>>>>>>>> fractionOfRemainder) {  return null; }
>>>>>>>>>>>>
>>>>>>>>>>>> > what will you do if we reach the finally block?
>>>>>>>>>>>>
>>>>>>>>>>>> At that point an exception is being thrown out of the
>>>>>>>>>>>> processElement function. The answer to that would be "what will 
>>>>>>>>>>>> the runtime
>>>>>>>>>>>> do if an exception is thrown out of the processElement function"
>>>>>>>>>>>>
>>>>>>>>>>>> > Open a WIP PR
>>>>>>>>>>>>
>>>>>>>>>>>> I have, but I'm staging changes in a separate repo. See
>>>>>>>>>>>> https://github.com/googleapis/java-pubsublite/pull/390
>>>>>>>>>>>> (although this incorporates other changes, see
>>>>>>>>>>>> PubsubLitePartitionSdf.java
>>>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>>>>>>>>>>>  and PubsubLiteOffsetRangeTracker.java
>>>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>>>>>>>>>>>  for
>>>>>>>>>>>> the sdf and restriction tracker implementations)
>>>>>>>>>>>>
>>>>>>>>>>>> -Dan
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Boyuan,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Responses inline.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee
>>>>>>>>>>>>>> that when you exit you SplittableDoFn.process you must have 
>>>>>>>>>>>>>> completed all
>>>>>>>>>>>>>> the work in the current restriction
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is impossible with unbounded restrictions since, by
>>>>>>>>>>>>>> definition, all work cannot be completed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Please note that your current restriction might be changed to
>>>>>>>>>>>>> a finite restriction during processing one bundle if you do 
>>>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>>>> self-initiated checkpointing or any runner issued splits.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > In your case, it seems like after you do tryClaim(3188439),
>>>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process 
>>>>>>>>>>>>>> function
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is not true. The code in question is below. stop() is
>>>>>>>>>>>>>> only returned if tryClaim returns false.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Dan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>> (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>       logger.atWarning().log("Failed to claim initial
>>>>>>>>>>>>>> restriction for partition " + partition);
>>>>>>>>>>>>>>       return ProcessContinuation.stop();
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>     sleepTimeRemaining = maxSleepTime;
>>>>>>>>>>>>>>     Committer committer = committerSupplier.apply(partition);
>>>>>>>>>>>>>>     committer.startAsync().awaitRunning();
>>>>>>>>>>>>>>     try (PullSubscriber<SequencedMessage> subscriber =
>>>>>>>>>>>>>>         subscriberFactory.apply(partition,
>>>>>>>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) {
>>>>>>>>>>>>>>       while (true) {
>>>>>>>>>>>>>>         List<SequencedMessage> messages = doPoll(subscriber);
>>>>>>>>>>>>>>         // We polled for as long as possible, yield to the
>>>>>>>>>>>>>> runtime to allow it to reschedule us on
>>>>>>>>>>>>>>         // a new task.
>>>>>>>>>>>>>>         if (messages.isEmpty()) {
>>>>>>>>>>>>>>           logger.atWarning().log("Yielding due to timeout on
>>>>>>>>>>>>>> partition " + partition);
>>>>>>>>>>>>>>           return ProcessContinuation.resume();
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>         long lastOffset =
>>>>>>>>>>>>>> Iterables.getLast(messages).offset().value();
>>>>>>>>>>>>>>         if (tracker.tryClaim(lastOffset)) {
>>>>>>>>>>>>>>           messages.forEach(
>>>>>>>>>>>>>>               message ->
>>>>>>>>>>>>>>                   receiver.outputWithTimestamp(
>>>>>>>>>>>>>>                       message, new
>>>>>>>>>>>>>> Instant(Timestamps.toMillis(message.publishTime()))));
>>>>>>>>>>>>>>           committer.commitOffset(Offset.of(lastOffset +
>>>>>>>>>>>>>> 1)).get();
>>>>>>>>>>>>>>         } else {
>>>>>>>>>>>>>>           logger.atWarning().log("Stopping partition " +
>>>>>>>>>>>>>> partition);
>>>>>>>>>>>>>>           return ProcessContinuation.stop();
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>     } finally {
>>>>>>>>>>>>>>       committer.stopAsync().awaitTerminated();
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>
>>>>>>>>>>>>> From your code, what will you do if we reach the finally
>>>>>>>>>>>>> block? Would you like to open a WIP PR to show more details?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Daniel,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee
>>>>>>>>>>>>>>> that when you exit you SplittableDoFn.process(either you return 
>>>>>>>>>>>>>>> stop() or
>>>>>>>>>>>>>>> resume()), you must have completed all the work in the current 
>>>>>>>>>>>>>>> restriction.
>>>>>>>>>>>>>>> This is one of major ways for SplittableDoFn to prevent data 
>>>>>>>>>>>>>>> loss.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In your case, it seems like after you do tryClaim(3188439),
>>>>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process 
>>>>>>>>>>>>>>> function.
>>>>>>>>>>>>>>> That's not a correct way when working with restriction and 
>>>>>>>>>>>>>>> restriction
>>>>>>>>>>>>>>> tracker. You should either return resume() to perform 
>>>>>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>>>>>> initiated checkpoint to defer processing restriction [3188439,
>>>>>>>>>>>>>>> 9223372036854775807), or you should return stop() only when you 
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>> tryClaim() return False.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running
>>>>>>>>>>>>>>>> into the following error on dataflow with a RestrictionTracker 
>>>>>>>>>>>>>>>> returning
>>>>>>>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to 
>>>>>>>>>>>>>>>> `checkDone`,
>>>>>>>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think 
>>>>>>>>>>>>>>>> there's any
>>>>>>>>>>>>>>>> rational thing I can do in this case. Does anyone know what 
>>>>>>>>>>>>>>>> should be done
>>>>>>>>>>>>>>>> for this method?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The following exist in the RestrictionTracker javadoc:
>>>>>>>>>>>>>>>> -Must throw an exception with an informative error message,
>>>>>>>>>>>>>>>> if there is still any unclaimed work remaining in the 
>>>>>>>>>>>>>>>> restriction. (there
>>>>>>>>>>>>>>>> is, the restriction is unbounded)
>>>>>>>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Dan
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> "Error message from worker:
>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Last attempted offset was 
>>>>>>>>>>>>>>>> 3188438 in range
>>>>>>>>>>>>>>>> [1998348, 9223372036854775807), claiming work in [3188439,
>>>>>>>>>>>>>>>> 9223372036854775807) was not attempted
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to