Hello Boyuan,
Responses inline.
> The checkDone is invoked by the SDK harness to guarantee that when you
exit you SplittableDoFn.process you must have completed all the work in the
current restriction
This is impossible with unbounded restrictions since, by definition, all
work cannot be completed.
> In your case, it seems like after you do tryClaim(3188439), you return
stop() directly from your SplittableDoFn.process function
This is not true. The code in question is below. stop() is only returned if
tryClaim returns false.
-Dan
```
if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
logger.atWarning().log("Failed to claim initial restriction for
partition " + partition);
return ProcessContinuation.stop();
}
sleepTimeRemaining = maxSleepTime;
Committer committer = committerSupplier.apply(partition);
committer.startAsync().awaitRunning();
try (PullSubscriber<SequencedMessage> subscriber =
subscriberFactory.apply(partition,
Offset.of(tracker.currentRestriction().getFrom()))) {
while (true) {
List<SequencedMessage> messages = doPoll(subscriber);
// We polled for as long as possible, yield to the runtime to allow
it to reschedule us on
// a new task.
if (messages.isEmpty()) {
logger.atWarning().log("Yielding due to timeout on partition " +
partition);
return ProcessContinuation.resume();
}
long lastOffset = Iterables.getLast(messages).offset().value();
if (tracker.tryClaim(lastOffset)) {
messages.forEach(
message ->
receiver.outputWithTimestamp(
message, new
Instant(Timestamps.toMillis(message.publishTime()))));
committer.commitOffset(Offset.of(lastOffset + 1)).get();
} else {
logger.atWarning().log("Stopping partition " + partition);
return ProcessContinuation.stop();
}
}
} finally {
committer.stopAsync().awaitTerminated();
}
```
On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]> wrote:
> Hi Daniel,
>
> The checkDone is invoked by the SDK harness to guarantee that when you
> exit you SplittableDoFn.process(either you return stop() or resume()), you
> must have completed all the work in the current restriction. This is one of
> major ways for SplittableDoFn to prevent data loss.
>
> In your case, it seems like after you do tryClaim(3188439), you return
> stop() directly from your SplittableDoFn.process function. That's not a
> correct way when working with restriction and restriction tracker. You
> should either return resume() to perform SplittableDoFn initiated
> checkpoint to defer processing restriction [3188439, 9223372036854775807),
> or you should return stop() only when you have tryClaim() return False.
>
>
> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]>
> wrote:
>
>> Hello all,
>>
>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the
>> following error on dataflow with a RestrictionTracker returning UNBOUNDED
>> to isBounded. It looks like calls are being made to `checkDone`, but
>> looking at the documentation of `checkDone`, I don't think there's any
>> rational thing I can do in this case. Does anyone know what should be done
>> for this method?
>>
>> The following exist in the RestrictionTracker javadoc:
>> -Must throw an exception with an informative error message, if there is
>> still any unclaimed work remaining in the restriction. (there is, the
>> restriction is unbounded)
>> -{@link RestrictionTracker#checkDone} MUST succeed
>>
>> Thanks,
>>
>> Dan
>>
>> "Error message from worker: java.lang.IllegalStateException: Last
>> attempted offset was 3188438 in range [1998348, 9223372036854775807),
>> claiming work in [3188439, 9223372036854775807) was not attempted
>>
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>
>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>
>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>
>