This does not appear to work: The CheckDone call, as far as I can tell, is made on the existing range not the split range based on the following error:
Error message from worker: java.lang.IllegalStateException: Last attempted offset was 4601978 in range [2975759, 9223372036854775807), claiming work in [4601979, 9223372036854775807) was not attempted org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) On Sat, Nov 28, 2020 at 2:18 AM Daniel Collins <[email protected]> wrote: > Can you confirm that the following implementation of trySplit will work as > intended (from an OffsetRangeTracker subclass)? > > @Override > public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { > return SplitResult.of( > new OffsetRange(currentRestriction().getFrom(), lastClaimedOffset > + 1), > new OffsetRange(lastClaimedOffset + 1, Long.MAX_VALUE)); > } > > > It would nice to update the documentation if that's confusing. > > If you could please update this (if this is indeed the case) to confirm > that it ensures that there will never be two unbounded restrictions sent to > DoFns running at the same time using this pattern, that would be great. > > In addition, I'm not quite sure how this works? When the 'trySplit' call > occurs, it returns two OffsetRanges, which don't yet include information > about claimed offsets. How is the first half of this converted to my > OffsetRangeTracker subclass with offsets already claimed? Does the runtime > call (and importantly, is it required to call): > > MyRestrictionTracker tracker = MyDoFn.newTracker(splitResult.getPrimary()); > tracker.tryClaim(previousClaimed); > tracker.checkDone(); > > On Sat, Nov 28, 2020 at 2:08 AM Boyuan Zhang <[email protected]> wrote: > >> It would nice to update the documentation if that's confusing. >> >> On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins <[email protected]> >> wrote: >> >>> I think the documentation for trySplit() doesn't make it clear that it >>> supports this use case. In particular this section: >>> >>> > This invocation updates the {@link #currentRestriction()} to be the >>> primary restriction effectively having the current {@link >>> DoFn.ProcessElement} execution responsible for performing the work that the >>> primary restriction represents. The residual restriction will be executed >>> in a separate {@link DoFn.ProcessElement} invocation (likely in a different >>> process). The work performed by executing the primary and residual >>> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be >>> equivalent to the work performed as if this split never occurred. >>> >>> Implies that the runner will try to run both restrictions again on >>> separate workers. This is not the behavior I am looking for, hence my >>> confusion. Can we change the documentation here to make clear that >>> checkDone will be called on the primary restriction in the output to ensure >>> that it is actually completed if the trySplit call was triggered by a call >>> to resume()? >>> >>> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang <[email protected]> wrote: >>> >>>> And if you look into the RestrictionTracker javadoc[1], it mentions >>>> that what means when you return null from trySplit. >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107 >>>> >>>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> To the extent I can see, this never mentions the restriction that you >>>>>> <need> to implement a split() that returns a bounded restriction if >>>>>> returning resume() from an SDF. Nor does this restriction particularly >>>>>> make >>>>>> sense if the range being processed is itself unbounded? Perhaps you would >>>>>> consider not calling checkDone() on resume() if the restriction provided >>>>>> to >>>>>> the runner is unbounded since it would be unreasonable to complete an >>>>>> unbounded restriction? >>>>> >>>>> >>>>> It seems like you are not familiar with how beam deals with resume() >>>>> so let's start from this part. Let's say that your SDF is processing a >>>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you >>>>> want to return resume() at this point. When you return resume() from here, >>>>> the beam Java DoFn invoker will know you want to resume and call your >>>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual >>>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as >>>>> your current restriction and [6, MAX) as the residual. Then beam Java DoFn >>>>> invoker will call checkDone on the restriction [0, 6) to double check your >>>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK >>>>> will return the [6, Max) restriction back to runner(in your case that's >>>>> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling >>>>> strategy. That's why if you want to use resume(), you need to implement >>>>> trySplit. That's also why trySplit and checkDone also make sense on >>>>> Unbounded restriction. >>>>> >>>>> Perhaps it would be better to explain in terms of why I'm trying to do >>>>>> this. If the subscription has not received any data in a while, or is >>>>>> receiving data infrequently, I want to enable dataflow to scale down to 1 >>>>>> worker, but there will be no need to call "tryClaim" if there is no new >>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving >>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not >>>>>> actually done with the data because, as new data can always be published >>>>>> in >>>>>> the future, we can't know that, and I'm trying to avoid needing to >>>>>> implement bounded reads to artificially produce sub-windows when an >>>>>> unbounded output is much more natural. >>>>> >>>>> >>>>> So you are referring to the resume use case. Please note that even >>>>> though you are returning resume() from your SDF, that doesn't means >>>>> Dataflow will guarantee that the worker will be downscaled to 1. But >>>>> resume() indeed can help you free some workers to process other work, >>>>> compared to having your SDF doing busy wait. >>>>> >>>>> This is good to know. So to rephrase, could I periodically call >>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control >>>>>> back >>>>>> to the runtime? >>>>> >>>>> >>>>> You can do so by implementing RestrictionTracker.trySplit() and using >>>>> resume(). >>>>> >>>>> You may also want to take a look at Kafka example[1]. Hope that is >>>>> helpful. >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java >>>>> >>>>> >>>>> >>>>> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <[email protected]> >>>>> wrote: >>>>> >>>>>> > There is an update-to-date sdf programming guide[1] and typically >>>>>> [2] is talking about SDF initiated-checkpointing >>>>>> >>>>>> To the extent I can see, this never mentions the restriction that you >>>>>> <need> to implement a split() that returns a bounded restriction if >>>>>> returning resume() from an SDF. Nor does this restriction particularly >>>>>> make >>>>>> sense if the range being processed is itself unbounded? Perhaps you would >>>>>> consider not calling checkDone() on resume() if the restriction provided >>>>>> to >>>>>> the runner is unbounded since it would be unreasonable to complete an >>>>>> unbounded restriction? >>>>>> >>>>>> > It depends on the definition of no new data to enable rescheduling. >>>>>> >>>>>> Perhaps it would be better to explain in terms of why I'm trying to >>>>>> do this. If the subscription has not received any data in a while, or is >>>>>> receiving data infrequently, I want to enable dataflow to scale down to 1 >>>>>> worker, but there will be no need to call "tryClaim" if there is no new >>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving >>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not >>>>>> actually done with the data because, as new data can always be published >>>>>> in >>>>>> the future, we can't know that, and I'm trying to avoid needing to >>>>>> implement bounded reads to artificially produce sub-windows when an >>>>>> unbounded output is much more natural. >>>>>> >>>>>> > Please note that the when an SDF is processing one element >>>>>> restriction pair, the start of the restriction is never changed. You will >>>>>> always get the same offset when you call currentRestriction().getFrom(). >>>>>> >>>>>> This is good to know. So to rephrase, could I periodically call >>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control >>>>>> back >>>>>> to the runtime? >>>>>> >>>>>> -Dan >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> > Okay, is this documented anywhere? In particular, >>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it >>>>>>> implies resume() should be returned when tryClaim returns false. >>>>>>> >>>>>>> There is an update-to-date sdf programming guide[1] and typically >>>>>>> [2] is talking about SDF initiated-checkpointing. And stop() should be >>>>>>> returned when tryClaim returns false where resume() is expected to >>>>>>> return >>>>>>> when the restriction is not fully processed and you want to defer >>>>>>> processing in the future. >>>>>>> >>>>>>> > If this is the case, is there any way I can yield control to the >>>>>>> runtime if I have no new data to enable rescheduling? For example, can I >>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >>>>>>> >>>>>>> It depends on the definition of no new data to enable rescheduling. >>>>>>> If you believe that you are done with current restriction even >>>>>>> though you are not reaching to the end of restriction, you can specially >>>>>>> say I'm done with current one by calling >>>>>>> restrictionTracker.tryClaim(MAX_LONG)(or tryClaim(restriction.getTo) if >>>>>>> you >>>>>>> are sure your end of restriction is not changed by any splitting). >>>>>>> If you just want to re-process the rest of the restriction after a >>>>>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the >>>>>>> trySplit and return resume(duration) when you want to resume. >>>>>>> >>>>>>> Please note that the when an SDF is processing one element >>>>>>> restriction pair, the start of the restriction is never changed. You >>>>>>> will >>>>>>> always get the same offset when you call currentRestriction().getFrom(). >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns >>>>>>> [2] >>>>>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint >>>>>>> >>>>>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> > you can either never return resume() from your SDF or implement >>>>>>>> suitable trySplit() logic for your RestrictionTracker >>>>>>>> >>>>>>>> Okay, is this documented anywhere? In particular, >>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it >>>>>>>> implies resume() should be returned when tryClaim returns false. >>>>>>>> >>>>>>>> If this is the case, is there any way I can yield control to the >>>>>>>> runtime if I have no new data to enable rescheduling? For example, can >>>>>>>> I >>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >>>>>>>> >>>>>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> > IIUC, this should never happen as long as I return null to >>>>>>>>> trySplit. Is this not the case? (trySplit implementation below) >>>>>>>>> >>>>>>>>> I noticed that in your implementation you return null for your >>>>>>>>> RestrictionTracker.trySplit. That means you cannot return resume() >>>>>>>>> from >>>>>>>>> your SplittableDoFn.process() body since resume() means >>>>>>>>> performing SplittableDoFn self-initiated checkpointing and >>>>>>>>> deferring processing residuals. >>>>>>>>> >>>>>>>>> In your case, you can either never return resume() from your SDF >>>>>>>>> or implement suitable trySplit() logic for your RestrictionTracker. >>>>>>>>> For >>>>>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an >>>>>>>>> infinite restriction. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java >>>>>>>>> >>>>>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> > Please note that your current restriction might be changed to a >>>>>>>>>> finite restriction during processing one bundle if you do >>>>>>>>>> SplittableDoFn >>>>>>>>>> self-initiated checkpointing or any runner issued splits >>>>>>>>>> >>>>>>>>>> IIUC, this should never happen as long as I return null to >>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below) >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public SplitResult<OffsetRange> trySplit(double >>>>>>>>>> fractionOfRemainder) { return null; } >>>>>>>>>> >>>>>>>>>> > what will you do if we reach the finally block? >>>>>>>>>> >>>>>>>>>> At that point an exception is being thrown out of the >>>>>>>>>> processElement function. The answer to that would be "what will the >>>>>>>>>> runtime >>>>>>>>>> do if an exception is thrown out of the processElement function" >>>>>>>>>> >>>>>>>>>> > Open a WIP PR >>>>>>>>>> >>>>>>>>>> I have, but I'm staging changes in a separate repo. See >>>>>>>>>> https://github.com/googleapis/java-pubsublite/pull/390 (although >>>>>>>>>> this incorporates other changes, see PubsubLitePartitionSdf.java >>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1> >>>>>>>>>> and PubsubLiteOffsetRangeTracker.java >>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67> >>>>>>>>>> for >>>>>>>>>> the sdf and restriction tracker implementations) >>>>>>>>>> >>>>>>>>>> -Dan >>>>>>>>>> >>>>>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello Boyuan, >>>>>>>>>>>> >>>>>>>>>>>> Responses inline. >>>>>>>>>>>> >>>>>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee that >>>>>>>>>>>> when you exit you SplittableDoFn.process you must have completed >>>>>>>>>>>> all the >>>>>>>>>>>> work in the current restriction >>>>>>>>>>>> >>>>>>>>>>>> This is impossible with unbounded restrictions since, by >>>>>>>>>>>> definition, all work cannot be completed. >>>>>>>>>>>> >>>>>>>>>>> Please note that your current restriction might be changed to a >>>>>>>>>>> finite restriction during processing one bundle if you do >>>>>>>>>>> SplittableDoFn >>>>>>>>>>> self-initiated checkpointing or any runner issued splits. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> > In your case, it seems like after you do tryClaim(3188439), >>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process >>>>>>>>>>>> function >>>>>>>>>>>> >>>>>>>>>>>> This is not true. The code in question is below. stop() is only >>>>>>>>>>>> returned if tryClaim returns false. >>>>>>>>>>>> >>>>>>>>>>>> -Dan >>>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) >>>>>>>>>>>> { >>>>>>>>>>>> >>>>>>>>>>> logger.atWarning().log("Failed to claim initial >>>>>>>>>>>> restriction for partition " + partition); >>>>>>>>>>>> return ProcessContinuation.stop(); >>>>>>>>>>>> } >>>>>>>>>>>> sleepTimeRemaining = maxSleepTime; >>>>>>>>>>>> Committer committer = committerSupplier.apply(partition); >>>>>>>>>>>> committer.startAsync().awaitRunning(); >>>>>>>>>>>> try (PullSubscriber<SequencedMessage> subscriber = >>>>>>>>>>>> subscriberFactory.apply(partition, >>>>>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) { >>>>>>>>>>>> while (true) { >>>>>>>>>>>> List<SequencedMessage> messages = doPoll(subscriber); >>>>>>>>>>>> // We polled for as long as possible, yield to the >>>>>>>>>>>> runtime to allow it to reschedule us on >>>>>>>>>>>> // a new task. >>>>>>>>>>>> if (messages.isEmpty()) { >>>>>>>>>>>> logger.atWarning().log("Yielding due to timeout on >>>>>>>>>>>> partition " + partition); >>>>>>>>>>>> return ProcessContinuation.resume(); >>>>>>>>>>>> } >>>>>>>>>>>> long lastOffset = >>>>>>>>>>>> Iterables.getLast(messages).offset().value(); >>>>>>>>>>>> if (tracker.tryClaim(lastOffset)) { >>>>>>>>>>>> messages.forEach( >>>>>>>>>>>> message -> >>>>>>>>>>>> receiver.outputWithTimestamp( >>>>>>>>>>>> message, new >>>>>>>>>>>> Instant(Timestamps.toMillis(message.publishTime())))); >>>>>>>>>>>> committer.commitOffset(Offset.of(lastOffset + >>>>>>>>>>>> 1)).get(); >>>>>>>>>>>> } else { >>>>>>>>>>>> logger.atWarning().log("Stopping partition " + >>>>>>>>>>>> partition); >>>>>>>>>>>> return ProcessContinuation.stop(); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> } finally { >>>>>>>>>>>> committer.stopAsync().awaitTerminated(); >>>>>>>>>>>> } >>>>>>>>>>>> ``` >>>>>>>>>>>> >>>>>>>>>>> From your code, what will you do if we reach the finally block? >>>>>>>>>>> Would you like to open a WIP PR to show more details? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Daniel, >>>>>>>>>>>>> >>>>>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee that >>>>>>>>>>>>> when you exit you SplittableDoFn.process(either you return stop() >>>>>>>>>>>>> or >>>>>>>>>>>>> resume()), you must have completed all the work in the current >>>>>>>>>>>>> restriction. >>>>>>>>>>>>> This is one of major ways for SplittableDoFn to prevent data loss. >>>>>>>>>>>>> >>>>>>>>>>>>> In your case, it seems like after you do tryClaim(3188439), >>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process >>>>>>>>>>>>> function. >>>>>>>>>>>>> That's not a correct way when working with restriction and >>>>>>>>>>>>> restriction >>>>>>>>>>>>> tracker. You should either return resume() to perform >>>>>>>>>>>>> SplittableDoFn >>>>>>>>>>>>> initiated checkpoint to defer processing restriction [3188439, >>>>>>>>>>>>> 9223372036854775807), or you should return stop() only when you >>>>>>>>>>>>> have >>>>>>>>>>>>> tryClaim() return False. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running >>>>>>>>>>>>>> into the following error on dataflow with a RestrictionTracker >>>>>>>>>>>>>> returning >>>>>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to >>>>>>>>>>>>>> `checkDone`, >>>>>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think >>>>>>>>>>>>>> there's any >>>>>>>>>>>>>> rational thing I can do in this case. Does anyone know what >>>>>>>>>>>>>> should be done >>>>>>>>>>>>>> for this method? >>>>>>>>>>>>>> >>>>>>>>>>>>>> The following exist in the RestrictionTracker javadoc: >>>>>>>>>>>>>> -Must throw an exception with an informative error message, >>>>>>>>>>>>>> if there is still any unclaimed work remaining in the >>>>>>>>>>>>>> restriction. (there >>>>>>>>>>>>>> is, the restriction is unbounded) >>>>>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dan >>>>>>>>>>>>>> >>>>>>>>>>>>>> "Error message from worker: java.lang.IllegalStateException: >>>>>>>>>>>>>> Last attempted offset was 3188438 in range [1998348, >>>>>>>>>>>>>> 9223372036854775807), >>>>>>>>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117) >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)" >>>>>>>>>>>>>> >>>>>>>>>>>>>
