On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]> wrote:

> Hello Boyuan,
>
> Responses inline.
>
> > The checkDone is invoked by the SDK harness to guarantee that when you
> exit you SplittableDoFn.process you must have completed all the work in the
> current restriction
>
> This is impossible with unbounded restrictions since, by definition, all
> work cannot be completed.
>
Please note that your current restriction might be changed to a finite
restriction during processing one bundle if you do SplittableDoFn
self-initiated checkpointing or any runner issued splits.


>
> > In your case, it seems like after you do tryClaim(3188439), you return
> stop() directly from your SplittableDoFn.process function
>
> This is not true. The code in question is below. stop() is only returned
> if tryClaim returns false.
>
> -Dan
>
> ```
> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>
      logger.atWarning().log("Failed to claim initial restriction for
> partition " + partition);
>       return ProcessContinuation.stop();
>     }
>     sleepTimeRemaining = maxSleepTime;
>     Committer committer = committerSupplier.apply(partition);
>     committer.startAsync().awaitRunning();
>     try (PullSubscriber<SequencedMessage> subscriber =
>         subscriberFactory.apply(partition,
> Offset.of(tracker.currentRestriction().getFrom()))) {
>       while (true) {
>         List<SequencedMessage> messages = doPoll(subscriber);
>         // We polled for as long as possible, yield to the runtime to
> allow it to reschedule us on
>         // a new task.
>         if (messages.isEmpty()) {
>           logger.atWarning().log("Yielding due to timeout on partition " +
> partition);
>           return ProcessContinuation.resume();
>         }
>         long lastOffset = Iterables.getLast(messages).offset().value();
>         if (tracker.tryClaim(lastOffset)) {
>           messages.forEach(
>               message ->
>                   receiver.outputWithTimestamp(
>                       message, new
> Instant(Timestamps.toMillis(message.publishTime()))));
>           committer.commitOffset(Offset.of(lastOffset + 1)).get();
>         } else {
>           logger.atWarning().log("Stopping partition " + partition);
>           return ProcessContinuation.stop();
>         }
>       }
>     } finally {
>       committer.stopAsync().awaitTerminated();
>     }
> ```
>
>From your code, what will you do if we reach the finally block? Would you
like to open a WIP PR to show more details?


>
> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]> wrote:
>
>> Hi Daniel,
>>
>> The checkDone is invoked by the SDK harness to guarantee that when you
>> exit you SplittableDoFn.process(either you return stop() or resume()), you
>> must have completed all the work in the current restriction. This is one of
>> major ways for SplittableDoFn to prevent data loss.
>>
>> In your case, it seems like after you do tryClaim(3188439), you return
>> stop() directly from your SplittableDoFn.process function. That's not a
>> correct way when working with restriction and restriction tracker. You
>> should either return resume() to perform SplittableDoFn initiated
>> checkpoint to defer processing restriction [3188439, 9223372036854775807),
>> or you should return stop() only when you have tryClaim() return False.
>>
>>
>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]>
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the
>>> following error on dataflow with a RestrictionTracker returning UNBOUNDED
>>> to isBounded. It looks like calls are being made to `checkDone`, but
>>> looking at the documentation of `checkDone`, I don't think there's any
>>> rational thing I can do in this case. Does anyone know what should be done
>>> for this method?
>>>
>>> The following exist in the RestrictionTracker javadoc:
>>> -Must throw an exception with an informative error message, if there is
>>> still any unclaimed work remaining in the restriction. (there is, the
>>> restriction is unbounded)
>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>
>>> Thanks,
>>>
>>> Dan
>>>
>>> "Error message from worker: java.lang.IllegalStateException: Last
>>> attempted offset was 3188438 in range [1998348, 9223372036854775807),
>>> claiming work in [3188439, 9223372036854775807) was not attempted
>>>
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>
>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>
>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>
>>

Reply via email to