> you can either never return resume() from your SDF or implement suitable trySplit() logic for your RestrictionTracker
Okay, is this documented anywhere? In particular, https://s.apache.org/splittable-do-fn seems out of date, since it implies resume() should be returned when tryClaim returns false. If this is the case, is there any way I can yield control to the runtime if I have no new data to enable rescheduling? For example, can I call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]> wrote: > > IIUC, this should never happen as long as I return null to trySplit. Is > this not the case? (trySplit implementation below) > > I noticed that in your implementation you return null for your > RestrictionTracker.trySplit. That means you cannot return resume() from > your SplittableDoFn.process() body since resume() means > performing SplittableDoFn self-initiated checkpointing and > deferring processing residuals. > > In your case, you can either never return resume() from your SDF or > implement suitable trySplit() logic for your RestrictionTracker. For > example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an > infinite restriction. > > [1] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java > > On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <[email protected]> > wrote: > >> > Please note that your current restriction might be changed to a finite >> restriction during processing one bundle if you do SplittableDoFn >> self-initiated checkpointing or any runner issued splits >> >> IIUC, this should never happen as long as I return null to trySplit. Is >> this not the case? (trySplit implementation below) >> >> @Override >> public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) >> { return null; } >> >> > what will you do if we reach the finally block? >> >> At that point an exception is being thrown out of the processElement >> function. The answer to that would be "what will the runtime do if an >> exception is thrown out of the processElement function" >> >> > Open a WIP PR >> >> I have, but I'm staging changes in a separate repo. See >> https://github.com/googleapis/java-pubsublite/pull/390 (although this >> incorporates other changes, see PubsubLitePartitionSdf.java >> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1> >> and PubsubLiteOffsetRangeTracker.java >> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67> >> for >> the sdf and restriction tracker implementations) >> >> -Dan >> >> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]> wrote: >> >>> >>> >>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]> >>> wrote: >>> >>>> Hello Boyuan, >>>> >>>> Responses inline. >>>> >>>> > The checkDone is invoked by the SDK harness to guarantee that when >>>> you exit you SplittableDoFn.process you must have completed all the work in >>>> the current restriction >>>> >>>> This is impossible with unbounded restrictions since, by definition, >>>> all work cannot be completed. >>>> >>> Please note that your current restriction might be changed to a finite >>> restriction during processing one bundle if you do SplittableDoFn >>> self-initiated checkpointing or any runner issued splits. >>> >>> >>>> >>>> > In your case, it seems like after you do tryClaim(3188439), you >>>> return stop() directly from your SplittableDoFn.process function >>>> >>>> This is not true. The code in question is below. stop() is only >>>> returned if tryClaim returns false. >>>> >>>> -Dan >>>> >>>> ``` >>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { >>>> >>> logger.atWarning().log("Failed to claim initial restriction for >>>> partition " + partition); >>>> return ProcessContinuation.stop(); >>>> } >>>> sleepTimeRemaining = maxSleepTime; >>>> Committer committer = committerSupplier.apply(partition); >>>> committer.startAsync().awaitRunning(); >>>> try (PullSubscriber<SequencedMessage> subscriber = >>>> subscriberFactory.apply(partition, >>>> Offset.of(tracker.currentRestriction().getFrom()))) { >>>> while (true) { >>>> List<SequencedMessage> messages = doPoll(subscriber); >>>> // We polled for as long as possible, yield to the runtime to >>>> allow it to reschedule us on >>>> // a new task. >>>> if (messages.isEmpty()) { >>>> logger.atWarning().log("Yielding due to timeout on partition >>>> " + partition); >>>> return ProcessContinuation.resume(); >>>> } >>>> long lastOffset = Iterables.getLast(messages).offset().value(); >>>> if (tracker.tryClaim(lastOffset)) { >>>> messages.forEach( >>>> message -> >>>> receiver.outputWithTimestamp( >>>> message, new >>>> Instant(Timestamps.toMillis(message.publishTime())))); >>>> committer.commitOffset(Offset.of(lastOffset + 1)).get(); >>>> } else { >>>> logger.atWarning().log("Stopping partition " + partition); >>>> return ProcessContinuation.stop(); >>>> } >>>> } >>>> } finally { >>>> committer.stopAsync().awaitTerminated(); >>>> } >>>> ``` >>>> >>> From your code, what will you do if we reach the finally block? Would >>> you like to open a WIP PR to show more details? >>> >>> >>>> >>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> Hi Daniel, >>>>> >>>>> The checkDone is invoked by the SDK harness to guarantee that when you >>>>> exit you SplittableDoFn.process(either you return stop() or resume()), you >>>>> must have completed all the work in the current restriction. This is one >>>>> of >>>>> major ways for SplittableDoFn to prevent data loss. >>>>> >>>>> In your case, it seems like after you do tryClaim(3188439), you return >>>>> stop() directly from your SplittableDoFn.process function. That's not a >>>>> correct way when working with restriction and restriction tracker. You >>>>> should either return resume() to perform SplittableDoFn initiated >>>>> checkpoint to defer processing restriction [3188439, 9223372036854775807), >>>>> or you should return stop() only when you have tryClaim() return False. >>>>> >>>>> >>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]> >>>>> wrote: >>>>> >>>>>> Hello all, >>>>>> >>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the >>>>>> following error on dataflow with a RestrictionTracker returning UNBOUNDED >>>>>> to isBounded. It looks like calls are being made to `checkDone`, but >>>>>> looking at the documentation of `checkDone`, I don't think there's any >>>>>> rational thing I can do in this case. Does anyone know what should be >>>>>> done >>>>>> for this method? >>>>>> >>>>>> The following exist in the RestrictionTracker javadoc: >>>>>> -Must throw an exception with an informative error message, if there >>>>>> is still any unclaimed work remaining in the restriction. (there is, the >>>>>> restriction is unbounded) >>>>>> -{@link RestrictionTracker#checkDone} MUST succeed >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Dan >>>>>> >>>>>> "Error message from worker: java.lang.IllegalStateException: Last >>>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807), >>>>>> claiming work in [3188439, 9223372036854775807) was not attempted >>>>>> >>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >>>>>> >>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117) >>>>>> >>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226) >>>>>> >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)" >>>>>> >>>>>
