> > To the extent I can see, this never mentions the restriction that you > <need> to implement a split() that returns a bounded restriction if > returning resume() from an SDF. Nor does this restriction particularly make > sense if the range being processed is itself unbounded? Perhaps you would > consider not calling checkDone() on resume() if the restriction provided to > the runner is unbounded since it would be unreasonable to complete an > unbounded restriction?
It seems like you are not familiar with how beam deals with resume() so let's start from this part. Let's say that your SDF is processing a restriction of [0, MAX) and so far you have done tryClaim(5), and you want to return resume() at this point. When you return resume() from here, the beam Java DoFn invoker will know you want to resume and call your restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as your current restriction and [6, MAX) as the residual. Then beam Java DoFn invoker will call checkDone on the restriction [0, 6) to double check your SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK will return the [6, Max) restriction back to runner(in your case that's Dataflow) and the runner will reschedule [6, MAX) based on its scheduling strategy. That's why if you want to use resume(), you need to implement trySplit. That's also why trySplit and checkDone also make sense on Unbounded restriction. Perhaps it would be better to explain in terms of why I'm trying to do > this. If the subscription has not received any data in a while, or is > receiving data infrequently, I want to enable dataflow to scale down to 1 > worker, but there will be no need to call "tryClaim" if there is no new > data from Pub/Sub Lite. All I want to do is, if data is arriving > infrequently, give dataflow the opportunity to scale my job down. I'm not > actually done with the data because, as new data can always be published in > the future, we can't know that, and I'm trying to avoid needing to > implement bounded reads to artificially produce sub-windows when an > unbounded output is much more natural. So you are referring to the resume use case. Please note that even though you are returning resume() from your SDF, that doesn't means Dataflow will guarantee that the worker will be downscaled to 1. But resume() indeed can help you free some workers to process other work, compared to having your SDF doing busy wait. This is good to know. So to rephrase, could I periodically call > tryClaim(<last sent message or getFrom if none yet>) to yield control back > to the runtime? You can do so by implementing RestrictionTracker.trySplit() and using resume(). You may also want to take a look at Kafka example[1]. Hope that is helpful. [1] https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <dpcoll...@google.com> wrote: > > There is an update-to-date sdf programming guide[1] and typically [2] is > talking about SDF initiated-checkpointing > > To the extent I can see, this never mentions the restriction that you > <need> to implement a split() that returns a bounded restriction if > returning resume() from an SDF. Nor does this restriction particularly make > sense if the range being processed is itself unbounded? Perhaps you would > consider not calling checkDone() on resume() if the restriction provided to > the runner is unbounded since it would be unreasonable to complete an > unbounded restriction? > > > It depends on the definition of no new data to enable rescheduling. > > Perhaps it would be better to explain in terms of why I'm trying to do > this. If the subscription has not received any data in a while, or is > receiving data infrequently, I want to enable dataflow to scale down to 1 > worker, but there will be no need to call "tryClaim" if there is no new > data from Pub/Sub Lite. All I want to do is, if data is arriving > infrequently, give dataflow the opportunity to scale my job down. I'm not > actually done with the data because, as new data can always be published in > the future, we can't know that, and I'm trying to avoid needing to > implement bounded reads to artificially produce sub-windows when an > unbounded output is much more natural. > > > Please note that the when an SDF is processing one element restriction > pair, the start of the restriction is never changed. You will always get > the same offset when you call currentRestriction().getFrom(). > > This is good to know. So to rephrase, could I periodically call > tryClaim(<last sent message or getFrom if none yet>) to yield control back > to the runtime? > > -Dan > > > > On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <boyu...@google.com> wrote: > >> > Okay, is this documented anywhere? In particular, >> https://s.apache.org/splittable-do-fn seems out of date, since it >> implies resume() should be returned when tryClaim returns false. >> >> There is an update-to-date sdf programming guide[1] and typically [2] is >> talking about SDF initiated-checkpointing. And stop() should be returned >> when tryClaim returns false where resume() is expected to return when the >> restriction is not fully processed and you want to defer processing in the >> future. >> >> > If this is the case, is there any way I can yield control to the >> runtime if I have no new data to enable rescheduling? For example, can I >> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >> >> It depends on the definition of no new data to enable rescheduling. >> If you believe that you are done with current restriction even though you >> are not reaching to the end of restriction, you can specially say I'm done >> with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or >> tryClaim(restriction.getTo) if you are sure your end of restriction is not >> changed by any splitting). >> If you just want to re-process the rest of the restriction after a >> certain time, e.g, 5 mins, 30mins and so on, you need to implement the >> trySplit and return resume(duration) when you want to resume. >> >> Please note that the when an SDF is processing one element restriction >> pair, the start of the restriction is never changed. You will always get >> the same offset when you call currentRestriction().getFrom(). >> >> >> [1] >> https://beam.apache.org/documentation/programming-guide/#splittable-dofns >> [2] >> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint >> >> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <dpcoll...@google.com> >> wrote: >> >>> > you can either never return resume() from your SDF or implement >>> suitable trySplit() logic for your RestrictionTracker >>> >>> Okay, is this documented anywhere? In particular, >>> https://s.apache.org/splittable-do-fn seems out of date, since it >>> implies resume() should be returned when tryClaim returns false. >>> >>> If this is the case, is there any way I can yield control to the runtime >>> if I have no new data to enable rescheduling? For example, can I call >>> tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >>> >>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <boyu...@google.com> >>> wrote: >>> >>>> > IIUC, this should never happen as long as I return null to trySplit. >>>> Is this not the case? (trySplit implementation below) >>>> >>>> I noticed that in your implementation you return null for your >>>> RestrictionTracker.trySplit. That means you cannot return resume() from >>>> your SplittableDoFn.process() body since resume() means >>>> performing SplittableDoFn self-initiated checkpointing and >>>> deferring processing residuals. >>>> >>>> In your case, you can either never return resume() from your SDF or >>>> implement suitable trySplit() logic for your RestrictionTracker. For >>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an >>>> infinite restriction. >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java >>>> >>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <dpcoll...@google.com> >>>> wrote: >>>> >>>>> > Please note that your current restriction might be changed to a >>>>> finite restriction during processing one bundle if you do SplittableDoFn >>>>> self-initiated checkpointing or any runner issued splits >>>>> >>>>> IIUC, this should never happen as long as I return null to trySplit. >>>>> Is this not the case? (trySplit implementation below) >>>>> >>>>> @Override >>>>> public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) >>>>> { return null; } >>>>> >>>>> > what will you do if we reach the finally block? >>>>> >>>>> At that point an exception is being thrown out of the processElement >>>>> function. The answer to that would be "what will the runtime do if an >>>>> exception is thrown out of the processElement function" >>>>> >>>>> > Open a WIP PR >>>>> >>>>> I have, but I'm staging changes in a separate repo. See >>>>> https://github.com/googleapis/java-pubsublite/pull/390 (although this >>>>> incorporates other changes, see PubsubLitePartitionSdf.java >>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1> >>>>> and PubsubLiteOffsetRangeTracker.java >>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67> >>>>> for >>>>> the sdf and restriction tracker implementations) >>>>> >>>>> -Dan >>>>> >>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <boyu...@google.com> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <dpcoll...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Hello Boyuan, >>>>>>> >>>>>>> Responses inline. >>>>>>> >>>>>>> > The checkDone is invoked by the SDK harness to guarantee that when >>>>>>> you exit you SplittableDoFn.process you must have completed all the >>>>>>> work in >>>>>>> the current restriction >>>>>>> >>>>>>> This is impossible with unbounded restrictions since, by definition, >>>>>>> all work cannot be completed. >>>>>>> >>>>>> Please note that your current restriction might be changed to a >>>>>> finite restriction during processing one bundle if you do SplittableDoFn >>>>>> self-initiated checkpointing or any runner issued splits. >>>>>> >>>>>> >>>>>>> >>>>>>> > In your case, it seems like after you do tryClaim(3188439), you >>>>>>> return stop() directly from your SplittableDoFn.process function >>>>>>> >>>>>>> This is not true. The code in question is below. stop() is only >>>>>>> returned if tryClaim returns false. >>>>>>> >>>>>>> -Dan >>>>>>> >>>>>>> ``` >>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { >>>>>>> >>>>>> logger.atWarning().log("Failed to claim initial restriction for >>>>>>> partition " + partition); >>>>>>> return ProcessContinuation.stop(); >>>>>>> } >>>>>>> sleepTimeRemaining = maxSleepTime; >>>>>>> Committer committer = committerSupplier.apply(partition); >>>>>>> committer.startAsync().awaitRunning(); >>>>>>> try (PullSubscriber<SequencedMessage> subscriber = >>>>>>> subscriberFactory.apply(partition, >>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) { >>>>>>> while (true) { >>>>>>> List<SequencedMessage> messages = doPoll(subscriber); >>>>>>> // We polled for as long as possible, yield to the runtime >>>>>>> to allow it to reschedule us on >>>>>>> // a new task. >>>>>>> if (messages.isEmpty()) { >>>>>>> logger.atWarning().log("Yielding due to timeout on >>>>>>> partition " + partition); >>>>>>> return ProcessContinuation.resume(); >>>>>>> } >>>>>>> long lastOffset = >>>>>>> Iterables.getLast(messages).offset().value(); >>>>>>> if (tracker.tryClaim(lastOffset)) { >>>>>>> messages.forEach( >>>>>>> message -> >>>>>>> receiver.outputWithTimestamp( >>>>>>> message, new >>>>>>> Instant(Timestamps.toMillis(message.publishTime())))); >>>>>>> committer.commitOffset(Offset.of(lastOffset + 1)).get(); >>>>>>> } else { >>>>>>> logger.atWarning().log("Stopping partition " + partition); >>>>>>> return ProcessContinuation.stop(); >>>>>>> } >>>>>>> } >>>>>>> } finally { >>>>>>> committer.stopAsync().awaitTerminated(); >>>>>>> } >>>>>>> ``` >>>>>>> >>>>>> From your code, what will you do if we reach the finally block? Would >>>>>> you like to open a WIP PR to show more details? >>>>>> >>>>>> >>>>>>> >>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <boyu...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Daniel, >>>>>>>> >>>>>>>> The checkDone is invoked by the SDK harness to guarantee that when >>>>>>>> you exit you SplittableDoFn.process(either you return stop() or >>>>>>>> resume()), >>>>>>>> you must have completed all the work in the current restriction. This >>>>>>>> is >>>>>>>> one of major ways for SplittableDoFn to prevent data loss. >>>>>>>> >>>>>>>> In your case, it seems like after you do tryClaim(3188439), you >>>>>>>> return stop() directly from your SplittableDoFn.process function. >>>>>>>> That's >>>>>>>> not a correct way when working with restriction and restriction >>>>>>>> tracker. >>>>>>>> You should either return resume() to perform SplittableDoFn initiated >>>>>>>> checkpoint to defer processing restriction [3188439, >>>>>>>> 9223372036854775807), >>>>>>>> or you should return stop() only when you have tryClaim() return False. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins < >>>>>>>> dpcoll...@google.com> wrote: >>>>>>>> >>>>>>>>> Hello all, >>>>>>>>> >>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into >>>>>>>>> the following error on dataflow with a RestrictionTracker returning >>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to >>>>>>>>> `checkDone`, >>>>>>>>> but looking at the documentation of `checkDone`, I don't think >>>>>>>>> there's any >>>>>>>>> rational thing I can do in this case. Does anyone know what should be >>>>>>>>> done >>>>>>>>> for this method? >>>>>>>>> >>>>>>>>> The following exist in the RestrictionTracker javadoc: >>>>>>>>> -Must throw an exception with an informative error message, if >>>>>>>>> there is still any unclaimed work remaining in the restriction. >>>>>>>>> (there is, >>>>>>>>> the restriction is unbounded) >>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Dan >>>>>>>>> >>>>>>>>> "Error message from worker: java.lang.IllegalStateException: Last >>>>>>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807), >>>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted >>>>>>>>> >>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >>>>>>>>> >>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117) >>>>>>>>> >>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226) >>>>>>>>> >>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)" >>>>>>>>> >>>>>>>>