And if you look into the RestrictionTracker javadoc[1], it mentions that what means when you return null from trySplit.
[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107 On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]> wrote: > To the extent I can see, this never mentions the restriction that you >> <need> to implement a split() that returns a bounded restriction if >> returning resume() from an SDF. Nor does this restriction particularly make >> sense if the range being processed is itself unbounded? Perhaps you would >> consider not calling checkDone() on resume() if the restriction provided to >> the runner is unbounded since it would be unreasonable to complete an >> unbounded restriction? > > > It seems like you are not familiar with how beam deals with resume() so > let's start from this part. Let's say that your SDF is processing a > restriction of [0, MAX) and so far you have done tryClaim(5), and you > want to return resume() at this point. When you return resume() from here, > the beam Java DoFn invoker will know you want to resume and call your > restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual > from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as > your current restriction and [6, MAX) as the residual. Then beam Java DoFn > invoker will call checkDone on the restriction [0, 6) to double check your > SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK > will return the [6, Max) restriction back to runner(in your case that's > Dataflow) and the runner will reschedule [6, MAX) based on its scheduling > strategy. That's why if you want to use resume(), you need to implement > trySplit. That's also why trySplit and checkDone also make sense on > Unbounded restriction. > > Perhaps it would be better to explain in terms of why I'm trying to do >> this. If the subscription has not received any data in a while, or is >> receiving data infrequently, I want to enable dataflow to scale down to 1 >> worker, but there will be no need to call "tryClaim" if there is no new >> data from Pub/Sub Lite. All I want to do is, if data is arriving >> infrequently, give dataflow the opportunity to scale my job down. I'm not >> actually done with the data because, as new data can always be published in >> the future, we can't know that, and I'm trying to avoid needing to >> implement bounded reads to artificially produce sub-windows when an >> unbounded output is much more natural. > > > So you are referring to the resume use case. Please note that even though > you are returning resume() from your SDF, that doesn't means Dataflow will > guarantee that the worker will be downscaled to 1. But resume() indeed can > help you free some workers to process other work, compared to having your > SDF doing busy wait. > > This is good to know. So to rephrase, could I periodically call >> tryClaim(<last sent message or getFrom if none yet>) to yield control back >> to the runtime? > > > You can do so by implementing RestrictionTracker.trySplit() and using > resume(). > > You may also want to take a look at Kafka example[1]. Hope that is helpful. > > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java > > > > On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <[email protected]> > wrote: > >> > There is an update-to-date sdf programming guide[1] and typically [2] >> is talking about SDF initiated-checkpointing >> >> To the extent I can see, this never mentions the restriction that you >> <need> to implement a split() that returns a bounded restriction if >> returning resume() from an SDF. Nor does this restriction particularly make >> sense if the range being processed is itself unbounded? Perhaps you would >> consider not calling checkDone() on resume() if the restriction provided to >> the runner is unbounded since it would be unreasonable to complete an >> unbounded restriction? >> >> > It depends on the definition of no new data to enable rescheduling. >> >> Perhaps it would be better to explain in terms of why I'm trying to do >> this. If the subscription has not received any data in a while, or is >> receiving data infrequently, I want to enable dataflow to scale down to 1 >> worker, but there will be no need to call "tryClaim" if there is no new >> data from Pub/Sub Lite. All I want to do is, if data is arriving >> infrequently, give dataflow the opportunity to scale my job down. I'm not >> actually done with the data because, as new data can always be published in >> the future, we can't know that, and I'm trying to avoid needing to >> implement bounded reads to artificially produce sub-windows when an >> unbounded output is much more natural. >> >> > Please note that the when an SDF is processing one element restriction >> pair, the start of the restriction is never changed. You will always get >> the same offset when you call currentRestriction().getFrom(). >> >> This is good to know. So to rephrase, could I periodically call >> tryClaim(<last sent message or getFrom if none yet>) to yield control back >> to the runtime? >> >> -Dan >> >> >> >> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]> wrote: >> >>> > Okay, is this documented anywhere? In particular, >>> https://s.apache.org/splittable-do-fn seems out of date, since it >>> implies resume() should be returned when tryClaim returns false. >>> >>> There is an update-to-date sdf programming guide[1] and typically [2] is >>> talking about SDF initiated-checkpointing. And stop() should be returned >>> when tryClaim returns false where resume() is expected to return when the >>> restriction is not fully processed and you want to defer processing in the >>> future. >>> >>> > If this is the case, is there any way I can yield control to the >>> runtime if I have no new data to enable rescheduling? For example, can I >>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >>> >>> It depends on the definition of no new data to enable rescheduling. >>> If you believe that you are done with current restriction even though >>> you are not reaching to the end of restriction, you can specially say I'm >>> done with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or >>> tryClaim(restriction.getTo) if you are sure your end of restriction is not >>> changed by any splitting). >>> If you just want to re-process the rest of the restriction after a >>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the >>> trySplit and return resume(duration) when you want to resume. >>> >>> Please note that the when an SDF is processing one element restriction >>> pair, the start of the restriction is never changed. You will always get >>> the same offset when you call currentRestriction().getFrom(). >>> >>> >>> [1] >>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns >>> [2] >>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint >>> >>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <[email protected]> >>> wrote: >>> >>>> > you can either never return resume() from your SDF or implement >>>> suitable trySplit() logic for your RestrictionTracker >>>> >>>> Okay, is this documented anywhere? In particular, >>>> https://s.apache.org/splittable-do-fn seems out of date, since it >>>> implies resume() should be returned when tryClaim returns false. >>>> >>>> If this is the case, is there any way I can yield control to the >>>> runtime if I have no new data to enable rescheduling? For example, can I >>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >>>> >>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> > IIUC, this should never happen as long as I return null to trySplit. >>>>> Is this not the case? (trySplit implementation below) >>>>> >>>>> I noticed that in your implementation you return null for your >>>>> RestrictionTracker.trySplit. That means you cannot return resume() from >>>>> your SplittableDoFn.process() body since resume() means >>>>> performing SplittableDoFn self-initiated checkpointing and >>>>> deferring processing residuals. >>>>> >>>>> In your case, you can either never return resume() from your SDF or >>>>> implement suitable trySplit() logic for your RestrictionTracker. For >>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an >>>>> infinite restriction. >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java >>>>> >>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <[email protected]> >>>>> wrote: >>>>> >>>>>> > Please note that your current restriction might be changed to a >>>>>> finite restriction during processing one bundle if you do SplittableDoFn >>>>>> self-initiated checkpointing or any runner issued splits >>>>>> >>>>>> IIUC, this should never happen as long as I return null to trySplit. >>>>>> Is this not the case? (trySplit implementation below) >>>>>> >>>>>> @Override >>>>>> public SplitResult<OffsetRange> trySplit(double >>>>>> fractionOfRemainder) { return null; } >>>>>> >>>>>> > what will you do if we reach the finally block? >>>>>> >>>>>> At that point an exception is being thrown out of the processElement >>>>>> function. The answer to that would be "what will the runtime do if an >>>>>> exception is thrown out of the processElement function" >>>>>> >>>>>> > Open a WIP PR >>>>>> >>>>>> I have, but I'm staging changes in a separate repo. See >>>>>> https://github.com/googleapis/java-pubsublite/pull/390 (although >>>>>> this incorporates other changes, see PubsubLitePartitionSdf.java >>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1> >>>>>> and PubsubLiteOffsetRangeTracker.java >>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67> >>>>>> for >>>>>> the sdf and restriction tracker implementations) >>>>>> >>>>>> -Dan >>>>>> >>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello Boyuan, >>>>>>>> >>>>>>>> Responses inline. >>>>>>>> >>>>>>>> > The checkDone is invoked by the SDK harness to guarantee that >>>>>>>> when you exit you SplittableDoFn.process you must have completed all >>>>>>>> the >>>>>>>> work in the current restriction >>>>>>>> >>>>>>>> This is impossible with unbounded restrictions since, by >>>>>>>> definition, all work cannot be completed. >>>>>>>> >>>>>>> Please note that your current restriction might be changed to a >>>>>>> finite restriction during processing one bundle if you do SplittableDoFn >>>>>>> self-initiated checkpointing or any runner issued splits. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> > In your case, it seems like after you do tryClaim(3188439), you >>>>>>>> return stop() directly from your SplittableDoFn.process function >>>>>>>> >>>>>>>> This is not true. The code in question is below. stop() is only >>>>>>>> returned if tryClaim returns false. >>>>>>>> >>>>>>>> -Dan >>>>>>>> >>>>>>>> ``` >>>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { >>>>>>>> >>>>>>> logger.atWarning().log("Failed to claim initial restriction >>>>>>>> for partition " + partition); >>>>>>>> return ProcessContinuation.stop(); >>>>>>>> } >>>>>>>> sleepTimeRemaining = maxSleepTime; >>>>>>>> Committer committer = committerSupplier.apply(partition); >>>>>>>> committer.startAsync().awaitRunning(); >>>>>>>> try (PullSubscriber<SequencedMessage> subscriber = >>>>>>>> subscriberFactory.apply(partition, >>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) { >>>>>>>> while (true) { >>>>>>>> List<SequencedMessage> messages = doPoll(subscriber); >>>>>>>> // We polled for as long as possible, yield to the runtime >>>>>>>> to allow it to reschedule us on >>>>>>>> // a new task. >>>>>>>> if (messages.isEmpty()) { >>>>>>>> logger.atWarning().log("Yielding due to timeout on >>>>>>>> partition " + partition); >>>>>>>> return ProcessContinuation.resume(); >>>>>>>> } >>>>>>>> long lastOffset = >>>>>>>> Iterables.getLast(messages).offset().value(); >>>>>>>> if (tracker.tryClaim(lastOffset)) { >>>>>>>> messages.forEach( >>>>>>>> message -> >>>>>>>> receiver.outputWithTimestamp( >>>>>>>> message, new >>>>>>>> Instant(Timestamps.toMillis(message.publishTime())))); >>>>>>>> committer.commitOffset(Offset.of(lastOffset + 1)).get(); >>>>>>>> } else { >>>>>>>> logger.atWarning().log("Stopping partition " + partition); >>>>>>>> return ProcessContinuation.stop(); >>>>>>>> } >>>>>>>> } >>>>>>>> } finally { >>>>>>>> committer.stopAsync().awaitTerminated(); >>>>>>>> } >>>>>>>> ``` >>>>>>>> >>>>>>> From your code, what will you do if we reach the finally block? >>>>>>> Would you like to open a WIP PR to show more details? >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Daniel, >>>>>>>>> >>>>>>>>> The checkDone is invoked by the SDK harness to guarantee that when >>>>>>>>> you exit you SplittableDoFn.process(either you return stop() or >>>>>>>>> resume()), >>>>>>>>> you must have completed all the work in the current restriction. This >>>>>>>>> is >>>>>>>>> one of major ways for SplittableDoFn to prevent data loss. >>>>>>>>> >>>>>>>>> In your case, it seems like after you do tryClaim(3188439), you >>>>>>>>> return stop() directly from your SplittableDoFn.process function. >>>>>>>>> That's >>>>>>>>> not a correct way when working with restriction and restriction >>>>>>>>> tracker. >>>>>>>>> You should either return resume() to perform SplittableDoFn initiated >>>>>>>>> checkpoint to defer processing restriction [3188439, >>>>>>>>> 9223372036854775807), >>>>>>>>> or you should return stop() only when you have tryClaim() return >>>>>>>>> False. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into >>>>>>>>>> the following error on dataflow with a RestrictionTracker returning >>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to >>>>>>>>>> `checkDone`, >>>>>>>>>> but looking at the documentation of `checkDone`, I don't think >>>>>>>>>> there's any >>>>>>>>>> rational thing I can do in this case. Does anyone know what should >>>>>>>>>> be done >>>>>>>>>> for this method? >>>>>>>>>> >>>>>>>>>> The following exist in the RestrictionTracker javadoc: >>>>>>>>>> -Must throw an exception with an informative error message, if >>>>>>>>>> there is still any unclaimed work remaining in the restriction. >>>>>>>>>> (there is, >>>>>>>>>> the restriction is unbounded) >>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Dan >>>>>>>>>> >>>>>>>>>> "Error message from worker: java.lang.IllegalStateException: Last >>>>>>>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807), >>>>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted >>>>>>>>>> >>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >>>>>>>>>> >>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117) >>>>>>>>>> >>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226) >>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)" >>>>>>>>>> >>>>>>>>>
