You need to update your current restriction when you do trySplit. That means your current restriction should be the primary of the split result. Please refer to existing OffsetRangeTracker implementation[1] for more details.
[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L81 On Fri, Nov 27, 2020 at 11:48 PM Daniel Collins <[email protected]> wrote: > Ah, I appear to have missed this line, which indicates that the current > RestrictionTracker must be updated. > > This invocation updates the {@link > * #currentRestriction()} to be the primary restriction effectively > having the current {@link > * DoFn.ProcessElement} execution responsible for performing the work > that the primary restriction > * represents. > > On Sat, Nov 28, 2020 at 2:45 AM Daniel Collins <[email protected]> > wrote: > >> This does not appear to work: The CheckDone call, as far as I can tell, >> is made on the existing range not the split range based on the following >> error: >> >> Error message from worker: java.lang.IllegalStateException: Last >> attempted offset was 4601978 in range [2975759, 9223372036854775807), >> claiming work in [4601979, 9223372036854775807) was not attempted >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >> >> On Sat, Nov 28, 2020 at 2:18 AM Daniel Collins <[email protected]> >> wrote: >> >>> Can you confirm that the following implementation of trySplit will work >>> as intended (from an OffsetRangeTracker subclass)? >>> >>> @Override >>> public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { >>> return SplitResult.of( >>> new OffsetRange(currentRestriction().getFrom(), >>> lastClaimedOffset + 1), >>> new OffsetRange(lastClaimedOffset + 1, Long.MAX_VALUE)); >>> } >>> >>> > It would nice to update the documentation if that's confusing. >>> >>> If you could please update this (if this is indeed the case) to confirm >>> that it ensures that there will never be two unbounded restrictions sent to >>> DoFns running at the same time using this pattern, that would be great. >>> >>> In addition, I'm not quite sure how this works? When the 'trySplit' call >>> occurs, it returns two OffsetRanges, which don't yet include information >>> about claimed offsets. How is the first half of this converted to my >>> OffsetRangeTracker subclass with offsets already claimed? Does the runtime >>> call (and importantly, is it required to call): >>> >>> MyRestrictionTracker tracker = >>> MyDoFn.newTracker(splitResult.getPrimary()); >>> tracker.tryClaim(previousClaimed); >>> tracker.checkDone(); >>> >>> On Sat, Nov 28, 2020 at 2:08 AM Boyuan Zhang <[email protected]> wrote: >>> >>>> It would nice to update the documentation if that's confusing. >>>> >>>> On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins <[email protected]> >>>> wrote: >>>> >>>>> I think the documentation for trySplit() doesn't make it clear that it >>>>> supports this use case. In particular this section: >>>>> >>>>> > This invocation updates the {@link #currentRestriction()} to be the >>>>> primary restriction effectively having the current {@link >>>>> DoFn.ProcessElement} execution responsible for performing the work that >>>>> the >>>>> primary restriction represents. The residual restriction will be executed >>>>> in a separate {@link DoFn.ProcessElement} invocation (likely in a >>>>> different >>>>> process). The work performed by executing the primary and residual >>>>> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be >>>>> equivalent to the work performed as if this split never occurred. >>>>> >>>>> Implies that the runner will try to run both restrictions again on >>>>> separate workers. This is not the behavior I am looking for, hence my >>>>> confusion. Can we change the documentation here to make clear that >>>>> checkDone will be called on the primary restriction in the output to >>>>> ensure >>>>> that it is actually completed if the trySplit call was triggered by a call >>>>> to resume()? >>>>> >>>>> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang <[email protected]> >>>>> wrote: >>>>> >>>>>> And if you look into the RestrictionTracker javadoc[1], it mentions >>>>>> that what means when you return null from trySplit. >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107 >>>>>> >>>>>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> To the extent I can see, this never mentions the restriction that >>>>>>>> you <need> to implement a split() that returns a bounded restriction if >>>>>>>> returning resume() from an SDF. Nor does this restriction particularly >>>>>>>> make >>>>>>>> sense if the range being processed is itself unbounded? Perhaps you >>>>>>>> would >>>>>>>> consider not calling checkDone() on resume() if the restriction >>>>>>>> provided to >>>>>>>> the runner is unbounded since it would be unreasonable to complete an >>>>>>>> unbounded restriction? >>>>>>> >>>>>>> >>>>>>> It seems like you are not familiar with how beam deals with resume() >>>>>>> so let's start from this part. Let's say that your SDF is processing a >>>>>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you >>>>>>> want to return resume() at this point. When you return resume() from >>>>>>> here, >>>>>>> the beam Java DoFn invoker will know you want to resume and call your >>>>>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual >>>>>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as >>>>>>> your current restriction and [6, MAX) as the residual. Then beam Java >>>>>>> DoFn >>>>>>> invoker will call checkDone on the restriction [0, 6) to double check >>>>>>> your >>>>>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK >>>>>>> will return the [6, Max) restriction back to runner(in your case that's >>>>>>> Dataflow) and the runner will reschedule [6, MAX) based on its >>>>>>> scheduling >>>>>>> strategy. That's why if you want to use resume(), you need to implement >>>>>>> trySplit. That's also why trySplit and checkDone also make sense on >>>>>>> Unbounded restriction. >>>>>>> >>>>>>> Perhaps it would be better to explain in terms of why I'm trying to >>>>>>>> do this. If the subscription has not received any data in a while, or >>>>>>>> is >>>>>>>> receiving data infrequently, I want to enable dataflow to scale down >>>>>>>> to 1 >>>>>>>> worker, but there will be no need to call "tryClaim" if there is no new >>>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving >>>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm >>>>>>>> not >>>>>>>> actually done with the data because, as new data can always be >>>>>>>> published in >>>>>>>> the future, we can't know that, and I'm trying to avoid needing to >>>>>>>> implement bounded reads to artificially produce sub-windows when an >>>>>>>> unbounded output is much more natural. >>>>>>> >>>>>>> >>>>>>> So you are referring to the resume use case. Please note that even >>>>>>> though you are returning resume() from your SDF, that doesn't means >>>>>>> Dataflow will guarantee that the worker will be downscaled to 1. But >>>>>>> resume() indeed can help you free some workers to process other work, >>>>>>> compared to having your SDF doing busy wait. >>>>>>> >>>>>>> This is good to know. So to rephrase, could I periodically call >>>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control >>>>>>>> back >>>>>>>> to the runtime? >>>>>>> >>>>>>> >>>>>>> You can do so by implementing RestrictionTracker.trySplit() and >>>>>>> using resume(). >>>>>>> >>>>>>> You may also want to take a look at Kafka example[1]. Hope that is >>>>>>> helpful. >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> > There is an update-to-date sdf programming guide[1] and typically >>>>>>>> [2] is talking about SDF initiated-checkpointing >>>>>>>> >>>>>>>> To the extent I can see, this never mentions the restriction that >>>>>>>> you <need> to implement a split() that returns a bounded restriction if >>>>>>>> returning resume() from an SDF. Nor does this restriction particularly >>>>>>>> make >>>>>>>> sense if the range being processed is itself unbounded? Perhaps you >>>>>>>> would >>>>>>>> consider not calling checkDone() on resume() if the restriction >>>>>>>> provided to >>>>>>>> the runner is unbounded since it would be unreasonable to complete an >>>>>>>> unbounded restriction? >>>>>>>> >>>>>>>> > It depends on the definition of no new data to enable >>>>>>>> rescheduling. >>>>>>>> >>>>>>>> Perhaps it would be better to explain in terms of why I'm trying to >>>>>>>> do this. If the subscription has not received any data in a while, or >>>>>>>> is >>>>>>>> receiving data infrequently, I want to enable dataflow to scale down >>>>>>>> to 1 >>>>>>>> worker, but there will be no need to call "tryClaim" if there is no new >>>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving >>>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm >>>>>>>> not >>>>>>>> actually done with the data because, as new data can always be >>>>>>>> published in >>>>>>>> the future, we can't know that, and I'm trying to avoid needing to >>>>>>>> implement bounded reads to artificially produce sub-windows when an >>>>>>>> unbounded output is much more natural. >>>>>>>> >>>>>>>> > Please note that the when an SDF is processing one element >>>>>>>> restriction pair, the start of the restriction is never changed. You >>>>>>>> will >>>>>>>> always get the same offset when you call >>>>>>>> currentRestriction().getFrom(). >>>>>>>> >>>>>>>> This is good to know. So to rephrase, could I periodically call >>>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control >>>>>>>> back >>>>>>>> to the runtime? >>>>>>>> >>>>>>>> -Dan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> > Okay, is this documented anywhere? In particular, >>>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it >>>>>>>>> implies resume() should be returned when tryClaim returns false. >>>>>>>>> >>>>>>>>> There is an update-to-date sdf programming guide[1] and typically >>>>>>>>> [2] is talking about SDF initiated-checkpointing. And stop() should be >>>>>>>>> returned when tryClaim returns false where resume() is expected to >>>>>>>>> return >>>>>>>>> when the restriction is not fully processed and you want to defer >>>>>>>>> processing in the future. >>>>>>>>> >>>>>>>>> > If this is the case, is there any way I can yield control to the >>>>>>>>> runtime if I have no new data to enable rescheduling? For example, >>>>>>>>> can I >>>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >>>>>>>>> >>>>>>>>> It depends on the definition of no new data to enable >>>>>>>>> rescheduling. >>>>>>>>> If you believe that you are done with current restriction even >>>>>>>>> though you are not reaching to the end of restriction, you can >>>>>>>>> specially >>>>>>>>> say I'm done with current one by calling >>>>>>>>> restrictionTracker.tryClaim(MAX_LONG)(or tryClaim(restriction.getTo) >>>>>>>>> if you >>>>>>>>> are sure your end of restriction is not changed by any splitting). >>>>>>>>> If you just want to re-process the rest of the restriction after a >>>>>>>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the >>>>>>>>> trySplit and return resume(duration) when you want to resume. >>>>>>>>> >>>>>>>>> Please note that the when an SDF is processing one element >>>>>>>>> restriction pair, the start of the restriction is never changed. You >>>>>>>>> will >>>>>>>>> always get the same offset when you call >>>>>>>>> currentRestriction().getFrom(). >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns >>>>>>>>> [2] >>>>>>>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint >>>>>>>>> >>>>>>>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> > you can either never return resume() from your SDF or implement >>>>>>>>>> suitable trySplit() logic for your RestrictionTracker >>>>>>>>>> >>>>>>>>>> Okay, is this documented anywhere? In particular, >>>>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since >>>>>>>>>> it implies resume() should be returned when tryClaim returns false. >>>>>>>>>> >>>>>>>>>> If this is the case, is there any way I can yield control to the >>>>>>>>>> runtime if I have no new data to enable rescheduling? For example, >>>>>>>>>> can I >>>>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >>>>>>>>>> >>>>>>>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> > IIUC, this should never happen as long as I return null to >>>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below) >>>>>>>>>>> >>>>>>>>>>> I noticed that in your implementation you return null for your >>>>>>>>>>> RestrictionTracker.trySplit. That means you cannot return resume() >>>>>>>>>>> from >>>>>>>>>>> your SplittableDoFn.process() body since resume() means >>>>>>>>>>> performing SplittableDoFn self-initiated checkpointing and >>>>>>>>>>> deferring processing residuals. >>>>>>>>>>> >>>>>>>>>>> In your case, you can either never return resume() from your SDF >>>>>>>>>>> or implement suitable trySplit() logic for your RestrictionTracker. >>>>>>>>>>> For >>>>>>>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an >>>>>>>>>>> infinite restriction. >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java >>>>>>>>>>> >>>>>>>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> > Please note that your current restriction might be changed to >>>>>>>>>>>> a finite restriction during processing one bundle if you do >>>>>>>>>>>> SplittableDoFn >>>>>>>>>>>> self-initiated checkpointing or any runner issued splits >>>>>>>>>>>> >>>>>>>>>>>> IIUC, this should never happen as long as I return null to >>>>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below) >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public SplitResult<OffsetRange> trySplit(double >>>>>>>>>>>> fractionOfRemainder) { return null; } >>>>>>>>>>>> >>>>>>>>>>>> > what will you do if we reach the finally block? >>>>>>>>>>>> >>>>>>>>>>>> At that point an exception is being thrown out of the >>>>>>>>>>>> processElement function. The answer to that would be "what will >>>>>>>>>>>> the runtime >>>>>>>>>>>> do if an exception is thrown out of the processElement function" >>>>>>>>>>>> >>>>>>>>>>>> > Open a WIP PR >>>>>>>>>>>> >>>>>>>>>>>> I have, but I'm staging changes in a separate repo. See >>>>>>>>>>>> https://github.com/googleapis/java-pubsublite/pull/390 >>>>>>>>>>>> (although this incorporates other changes, see >>>>>>>>>>>> PubsubLitePartitionSdf.java >>>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1> >>>>>>>>>>>> and PubsubLiteOffsetRangeTracker.java >>>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67> >>>>>>>>>>>> for >>>>>>>>>>>> the sdf and restriction tracker implementations) >>>>>>>>>>>> >>>>>>>>>>>> -Dan >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Boyuan, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Responses inline. >>>>>>>>>>>>>> >>>>>>>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee >>>>>>>>>>>>>> that when you exit you SplittableDoFn.process you must have >>>>>>>>>>>>>> completed all >>>>>>>>>>>>>> the work in the current restriction >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is impossible with unbounded restrictions since, by >>>>>>>>>>>>>> definition, all work cannot be completed. >>>>>>>>>>>>>> >>>>>>>>>>>>> Please note that your current restriction might be changed to >>>>>>>>>>>>> a finite restriction during processing one bundle if you do >>>>>>>>>>>>> SplittableDoFn >>>>>>>>>>>>> self-initiated checkpointing or any runner issued splits. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> > In your case, it seems like after you do tryClaim(3188439), >>>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process >>>>>>>>>>>>>> function >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is not true. The code in question is below. stop() is >>>>>>>>>>>>>> only returned if tryClaim returns false. >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Dan >>>>>>>>>>>>>> >>>>>>>>>>>>>> ``` >>>>>>>>>>>>>> if >>>>>>>>>>>>>> (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { >>>>>>>>>>>>>> >>>>>>>>>>>>> logger.atWarning().log("Failed to claim initial >>>>>>>>>>>>>> restriction for partition " + partition); >>>>>>>>>>>>>> return ProcessContinuation.stop(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> sleepTimeRemaining = maxSleepTime; >>>>>>>>>>>>>> Committer committer = committerSupplier.apply(partition); >>>>>>>>>>>>>> committer.startAsync().awaitRunning(); >>>>>>>>>>>>>> try (PullSubscriber<SequencedMessage> subscriber = >>>>>>>>>>>>>> subscriberFactory.apply(partition, >>>>>>>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) { >>>>>>>>>>>>>> while (true) { >>>>>>>>>>>>>> List<SequencedMessage> messages = doPoll(subscriber); >>>>>>>>>>>>>> // We polled for as long as possible, yield to the >>>>>>>>>>>>>> runtime to allow it to reschedule us on >>>>>>>>>>>>>> // a new task. >>>>>>>>>>>>>> if (messages.isEmpty()) { >>>>>>>>>>>>>> logger.atWarning().log("Yielding due to timeout on >>>>>>>>>>>>>> partition " + partition); >>>>>>>>>>>>>> return ProcessContinuation.resume(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> long lastOffset = >>>>>>>>>>>>>> Iterables.getLast(messages).offset().value(); >>>>>>>>>>>>>> if (tracker.tryClaim(lastOffset)) { >>>>>>>>>>>>>> messages.forEach( >>>>>>>>>>>>>> message -> >>>>>>>>>>>>>> receiver.outputWithTimestamp( >>>>>>>>>>>>>> message, new >>>>>>>>>>>>>> Instant(Timestamps.toMillis(message.publishTime())))); >>>>>>>>>>>>>> committer.commitOffset(Offset.of(lastOffset + >>>>>>>>>>>>>> 1)).get(); >>>>>>>>>>>>>> } else { >>>>>>>>>>>>>> logger.atWarning().log("Stopping partition " + >>>>>>>>>>>>>> partition); >>>>>>>>>>>>>> return ProcessContinuation.stop(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } finally { >>>>>>>>>>>>>> committer.stopAsync().awaitTerminated(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> ``` >>>>>>>>>>>>>> >>>>>>>>>>>>> From your code, what will you do if we reach the finally >>>>>>>>>>>>> block? Would you like to open a WIP PR to show more details? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Daniel, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee >>>>>>>>>>>>>>> that when you exit you SplittableDoFn.process(either you return >>>>>>>>>>>>>>> stop() or >>>>>>>>>>>>>>> resume()), you must have completed all the work in the current >>>>>>>>>>>>>>> restriction. >>>>>>>>>>>>>>> This is one of major ways for SplittableDoFn to prevent data >>>>>>>>>>>>>>> loss. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In your case, it seems like after you do tryClaim(3188439), >>>>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process >>>>>>>>>>>>>>> function. >>>>>>>>>>>>>>> That's not a correct way when working with restriction and >>>>>>>>>>>>>>> restriction >>>>>>>>>>>>>>> tracker. You should either return resume() to perform >>>>>>>>>>>>>>> SplittableDoFn >>>>>>>>>>>>>>> initiated checkpoint to defer processing restriction [3188439, >>>>>>>>>>>>>>> 9223372036854775807), or you should return stop() only when you >>>>>>>>>>>>>>> have >>>>>>>>>>>>>>> tryClaim() return False. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running >>>>>>>>>>>>>>>> into the following error on dataflow with a RestrictionTracker >>>>>>>>>>>>>>>> returning >>>>>>>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to >>>>>>>>>>>>>>>> `checkDone`, >>>>>>>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think >>>>>>>>>>>>>>>> there's any >>>>>>>>>>>>>>>> rational thing I can do in this case. Does anyone know what >>>>>>>>>>>>>>>> should be done >>>>>>>>>>>>>>>> for this method? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The following exist in the RestrictionTracker javadoc: >>>>>>>>>>>>>>>> -Must throw an exception with an informative error message, >>>>>>>>>>>>>>>> if there is still any unclaimed work remaining in the >>>>>>>>>>>>>>>> restriction. (there >>>>>>>>>>>>>>>> is, the restriction is unbounded) >>>>>>>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Dan >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> "Error message from worker: >>>>>>>>>>>>>>>> java.lang.IllegalStateException: Last attempted offset was >>>>>>>>>>>>>>>> 3188438 in range >>>>>>>>>>>>>>>> [1998348, 9223372036854775807), claiming work in [3188439, >>>>>>>>>>>>>>>> 9223372036854775807) was not attempted >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>
