This does not appear to work: The CheckDone call, as far as I can tell, is
made on the existing range not the split range based on the following error:

Error message from worker: java.lang.IllegalStateException: Last attempted
offset was 4601978 in range [2975759, 9223372036854775807), claiming work
in [4601979, 9223372036854775807) was not attempted
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)

On Sat, Nov 28, 2020 at 2:18 AM Daniel Collins <[email protected]> wrote:

> Can you confirm that the following implementation of trySplit will work as
> intended (from an OffsetRangeTracker subclass)?
>
> @Override
>   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
>     return SplitResult.of(
>         new OffsetRange(currentRestriction().getFrom(), lastClaimedOffset
> + 1),
>         new OffsetRange(lastClaimedOffset + 1, Long.MAX_VALUE));
>   }
>
> > It would nice to update the documentation if that's confusing.
>
> If you could please update this (if this is indeed the case) to confirm
> that it ensures that there will never be two unbounded restrictions sent to
> DoFns running at the same time using this pattern, that would be great.
>
> In addition, I'm not quite sure how this works? When the 'trySplit' call
> occurs, it returns two OffsetRanges, which don't yet include information
> about claimed offsets. How is the first half of this converted to my
> OffsetRangeTracker subclass with offsets already claimed? Does the runtime
> call (and importantly, is it required to call):
>
> MyRestrictionTracker tracker = MyDoFn.newTracker(splitResult.getPrimary());
> tracker.tryClaim(previousClaimed);
> tracker.checkDone();
>
> On Sat, Nov 28, 2020 at 2:08 AM Boyuan Zhang <[email protected]> wrote:
>
>> It would nice to update the documentation if that's confusing.
>>
>> On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins <[email protected]>
>> wrote:
>>
>>> I think the documentation for trySplit() doesn't make it clear that it
>>> supports this use case. In particular this section:
>>>
>>> > This invocation updates the {@link #currentRestriction()} to be the
>>> primary restriction effectively having the current {@link
>>> DoFn.ProcessElement} execution responsible for performing the work that the
>>> primary restriction represents. The residual restriction will be executed
>>> in a separate {@link DoFn.ProcessElement} invocation (likely in a different
>>> process). The work performed by executing the primary and residual
>>> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be
>>> equivalent to the work performed as if this split never occurred.
>>>
>>> Implies that the runner will try to run both restrictions again on
>>> separate workers. This is not the behavior I am looking for, hence my
>>> confusion. Can we change the documentation here to make clear that
>>> checkDone will be called on the primary restriction in the output to ensure
>>> that it is actually completed if the trySplit call was triggered by a call
>>> to resume()?
>>>
>>> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang <[email protected]> wrote:
>>>
>>>> And if you look into the RestrictionTracker javadoc[1], it mentions
>>>> that what means when you return null from trySplit.
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107
>>>>
>>>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> To the extent I can see, this never mentions the restriction that you
>>>>>> <need> to implement a split() that returns a bounded restriction if
>>>>>> returning resume() from an SDF. Nor does this restriction particularly 
>>>>>> make
>>>>>> sense if the range being processed is itself unbounded? Perhaps you would
>>>>>> consider not calling checkDone() on resume() if the restriction provided 
>>>>>> to
>>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>>> unbounded restriction?
>>>>>
>>>>>
>>>>> It seems like you are not familiar with how beam deals with resume()
>>>>> so let's start from this part. Let's say that your SDF is processing a
>>>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you
>>>>> want to return resume() at this point. When you return resume() from here,
>>>>> the beam Java DoFn invoker will know you want to resume and call your
>>>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
>>>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
>>>>> your current restriction and [6, MAX) as the residual. Then beam Java DoFn
>>>>> invoker will call checkDone on the restriction [0, 6) to double check your
>>>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
>>>>> will return the [6, Max) restriction back to runner(in your case that's
>>>>> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
>>>>> strategy. That's why if you want to use resume(), you need to implement
>>>>> trySplit. That's also why trySplit and checkDone also make sense on
>>>>> Unbounded restriction.
>>>>>
>>>>> Perhaps it would be better to explain in terms of why I'm trying to do
>>>>>> this. If the subscription has not received any data in a while, or is
>>>>>> receiving data infrequently, I want to enable dataflow to scale down to 1
>>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>>>>>> actually done with the data because, as new data can always be published 
>>>>>> in
>>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>>> unbounded output is much more natural.
>>>>>
>>>>>
>>>>> So you are referring to the resume use case. Please note that even
>>>>> though you are returning resume() from your SDF, that doesn't means
>>>>> Dataflow will guarantee that the worker will be downscaled to 1. But
>>>>> resume() indeed can help you free some workers to process other work,
>>>>> compared to having your SDF doing busy wait.
>>>>>
>>>>> This is good to know. So to rephrase, could I periodically call
>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control 
>>>>>> back
>>>>>> to the runtime?
>>>>>
>>>>>
>>>>> You can do so by implementing RestrictionTracker.trySplit() and using
>>>>> resume().
>>>>>
>>>>> You may also want to take a look at Kafka example[1]. Hope that is
>>>>> helpful.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> > There is an update-to-date sdf programming guide[1] and typically
>>>>>> [2] is talking about SDF initiated-checkpointing
>>>>>>
>>>>>> To the extent I can see, this never mentions the restriction that you
>>>>>> <need> to implement a split() that returns a bounded restriction if
>>>>>> returning resume() from an SDF. Nor does this restriction particularly 
>>>>>> make
>>>>>> sense if the range being processed is itself unbounded? Perhaps you would
>>>>>> consider not calling checkDone() on resume() if the restriction provided 
>>>>>> to
>>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>>> unbounded restriction?
>>>>>>
>>>>>> > It depends on the definition of no new data to enable rescheduling.
>>>>>>
>>>>>> Perhaps it would be better to explain in terms of why I'm trying to
>>>>>> do this. If the subscription has not received any data in a while, or is
>>>>>> receiving data infrequently, I want to enable dataflow to scale down to 1
>>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>>>>>> actually done with the data because, as new data can always be published 
>>>>>> in
>>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>>> unbounded output is much more natural.
>>>>>>
>>>>>> > Please note that the when an SDF is processing one element
>>>>>> restriction pair, the start of the restriction is never changed. You will
>>>>>> always get the same offset when you call currentRestriction().getFrom().
>>>>>>
>>>>>> This is good to know. So to rephrase, could I periodically call
>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control 
>>>>>> back
>>>>>> to the runtime?
>>>>>>
>>>>>> -Dan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> > Okay, is this documented anywhere? In particular,
>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>>
>>>>>>> There is an update-to-date sdf programming guide[1] and typically
>>>>>>> [2] is talking about SDF initiated-checkpointing. And stop() should be
>>>>>>> returned when tryClaim returns false where resume() is expected to 
>>>>>>> return
>>>>>>> when the restriction is not fully processed and you want to defer
>>>>>>> processing in the future.
>>>>>>>
>>>>>>> > If this is the case, is there any way I can yield control to the
>>>>>>> runtime if I have no new data to enable rescheduling? For example, can I
>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>>
>>>>>>> It depends on the definition of no new data to enable rescheduling.
>>>>>>> If you believe that you are done with current restriction even
>>>>>>> though you are not reaching to the end of restriction, you can specially
>>>>>>> say I'm done with current one by calling
>>>>>>> restrictionTracker.tryClaim(MAX_LONG)(or tryClaim(restriction.getTo) if 
>>>>>>> you
>>>>>>> are sure your end of restriction is not changed by any splitting).
>>>>>>> If you just want to re-process the rest of the restriction after a
>>>>>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the
>>>>>>> trySplit and return resume(duration) when you want to resume.
>>>>>>>
>>>>>>> Please note that the when an SDF is processing one element
>>>>>>> restriction pair, the start of the restriction is never changed. You 
>>>>>>> will
>>>>>>> always get the same offset when you call currentRestriction().getFrom().
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>>>>> [2]
>>>>>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint
>>>>>>>
>>>>>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> > you can either never return resume() from your SDF or implement
>>>>>>>> suitable trySplit() logic for your RestrictionTracker
>>>>>>>>
>>>>>>>> Okay, is this documented anywhere? In particular,
>>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>>>
>>>>>>>> If this is the case, is there any way I can yield control to the
>>>>>>>> runtime if I have no new data to enable rescheduling? For example, can 
>>>>>>>> I
>>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>>>
>>>>>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> > IIUC, this should never happen as long as I return null to
>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>>
>>>>>>>>> I noticed that in your implementation you return null for your
>>>>>>>>> RestrictionTracker.trySplit. That means you cannot return resume() 
>>>>>>>>> from
>>>>>>>>> your SplittableDoFn.process() body since resume() means
>>>>>>>>> performing SplittableDoFn self-initiated checkpointing and
>>>>>>>>> deferring processing residuals.
>>>>>>>>>
>>>>>>>>> In your case, you can either never return resume() from your SDF
>>>>>>>>> or implement suitable trySplit() logic for your RestrictionTracker. 
>>>>>>>>> For
>>>>>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
>>>>>>>>> infinite restriction.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>>>>>>>>>
>>>>>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> > Please note that your current restriction might be changed to a
>>>>>>>>>> finite restriction during processing one bundle if you do 
>>>>>>>>>> SplittableDoFn
>>>>>>>>>> self-initiated checkpointing or any runner issued splits
>>>>>>>>>>
>>>>>>>>>> IIUC, this should never happen as long as I return null to
>>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>>   public SplitResult<OffsetRange> trySplit(double
>>>>>>>>>> fractionOfRemainder) {  return null; }
>>>>>>>>>>
>>>>>>>>>> > what will you do if we reach the finally block?
>>>>>>>>>>
>>>>>>>>>> At that point an exception is being thrown out of the
>>>>>>>>>> processElement function. The answer to that would be "what will the 
>>>>>>>>>> runtime
>>>>>>>>>> do if an exception is thrown out of the processElement function"
>>>>>>>>>>
>>>>>>>>>> > Open a WIP PR
>>>>>>>>>>
>>>>>>>>>> I have, but I'm staging changes in a separate repo. See
>>>>>>>>>> https://github.com/googleapis/java-pubsublite/pull/390 (although
>>>>>>>>>> this incorporates other changes, see PubsubLitePartitionSdf.java
>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>>>>>>>>>  and PubsubLiteOffsetRangeTracker.java
>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>>>>>>>>>  for
>>>>>>>>>> the sdf and restriction tracker implementations)
>>>>>>>>>>
>>>>>>>>>> -Dan
>>>>>>>>>>
>>>>>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello Boyuan,
>>>>>>>>>>>>
>>>>>>>>>>>> Responses inline.
>>>>>>>>>>>>
>>>>>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>>>>>> when you exit you SplittableDoFn.process you must have completed 
>>>>>>>>>>>> all the
>>>>>>>>>>>> work in the current restriction
>>>>>>>>>>>>
>>>>>>>>>>>> This is impossible with unbounded restrictions since, by
>>>>>>>>>>>> definition, all work cannot be completed.
>>>>>>>>>>>>
>>>>>>>>>>> Please note that your current restriction might be changed to a
>>>>>>>>>>> finite restriction during processing one bundle if you do 
>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>> self-initiated checkpointing or any runner issued splits.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> > In your case, it seems like after you do tryClaim(3188439),
>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process 
>>>>>>>>>>>> function
>>>>>>>>>>>>
>>>>>>>>>>>> This is not true. The code in question is below. stop() is only
>>>>>>>>>>>> returned if tryClaim returns false.
>>>>>>>>>>>>
>>>>>>>>>>>> -Dan
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom()))
>>>>>>>>>>>> {
>>>>>>>>>>>>
>>>>>>>>>>>       logger.atWarning().log("Failed to claim initial
>>>>>>>>>>>> restriction for partition " + partition);
>>>>>>>>>>>>       return ProcessContinuation.stop();
>>>>>>>>>>>>     }
>>>>>>>>>>>>     sleepTimeRemaining = maxSleepTime;
>>>>>>>>>>>>     Committer committer = committerSupplier.apply(partition);
>>>>>>>>>>>>     committer.startAsync().awaitRunning();
>>>>>>>>>>>>     try (PullSubscriber<SequencedMessage> subscriber =
>>>>>>>>>>>>         subscriberFactory.apply(partition,
>>>>>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) {
>>>>>>>>>>>>       while (true) {
>>>>>>>>>>>>         List<SequencedMessage> messages = doPoll(subscriber);
>>>>>>>>>>>>         // We polled for as long as possible, yield to the
>>>>>>>>>>>> runtime to allow it to reschedule us on
>>>>>>>>>>>>         // a new task.
>>>>>>>>>>>>         if (messages.isEmpty()) {
>>>>>>>>>>>>           logger.atWarning().log("Yielding due to timeout on
>>>>>>>>>>>> partition " + partition);
>>>>>>>>>>>>           return ProcessContinuation.resume();
>>>>>>>>>>>>         }
>>>>>>>>>>>>         long lastOffset =
>>>>>>>>>>>> Iterables.getLast(messages).offset().value();
>>>>>>>>>>>>         if (tracker.tryClaim(lastOffset)) {
>>>>>>>>>>>>           messages.forEach(
>>>>>>>>>>>>               message ->
>>>>>>>>>>>>                   receiver.outputWithTimestamp(
>>>>>>>>>>>>                       message, new
>>>>>>>>>>>> Instant(Timestamps.toMillis(message.publishTime()))));
>>>>>>>>>>>>           committer.commitOffset(Offset.of(lastOffset +
>>>>>>>>>>>> 1)).get();
>>>>>>>>>>>>         } else {
>>>>>>>>>>>>           logger.atWarning().log("Stopping partition " +
>>>>>>>>>>>> partition);
>>>>>>>>>>>>           return ProcessContinuation.stop();
>>>>>>>>>>>>         }
>>>>>>>>>>>>       }
>>>>>>>>>>>>     } finally {
>>>>>>>>>>>>       committer.stopAsync().awaitTerminated();
>>>>>>>>>>>>     }
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>> From your code, what will you do if we reach the finally block?
>>>>>>>>>>> Would you like to open a WIP PR to show more details?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Daniel,
>>>>>>>>>>>>>
>>>>>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>>>>>>> when you exit you SplittableDoFn.process(either you return stop() 
>>>>>>>>>>>>> or
>>>>>>>>>>>>> resume()), you must have completed all the work in the current 
>>>>>>>>>>>>> restriction.
>>>>>>>>>>>>> This is one of major ways for SplittableDoFn to prevent data loss.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In your case, it seems like after you do tryClaim(3188439),
>>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process 
>>>>>>>>>>>>> function.
>>>>>>>>>>>>> That's not a correct way when working with restriction and 
>>>>>>>>>>>>> restriction
>>>>>>>>>>>>> tracker. You should either return resume() to perform 
>>>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>>>> initiated checkpoint to defer processing restriction [3188439,
>>>>>>>>>>>>> 9223372036854775807), or you should return stop() only when you 
>>>>>>>>>>>>> have
>>>>>>>>>>>>> tryClaim() return False.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running
>>>>>>>>>>>>>> into the following error on dataflow with a RestrictionTracker 
>>>>>>>>>>>>>> returning
>>>>>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to 
>>>>>>>>>>>>>> `checkDone`,
>>>>>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think 
>>>>>>>>>>>>>> there's any
>>>>>>>>>>>>>> rational thing I can do in this case. Does anyone know what 
>>>>>>>>>>>>>> should be done
>>>>>>>>>>>>>> for this method?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The following exist in the RestrictionTracker javadoc:
>>>>>>>>>>>>>> -Must throw an exception with an informative error message,
>>>>>>>>>>>>>> if there is still any unclaimed work remaining in the 
>>>>>>>>>>>>>> restriction. (there
>>>>>>>>>>>>>> is, the restriction is unbounded)
>>>>>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> "Error message from worker: java.lang.IllegalStateException:
>>>>>>>>>>>>>> Last attempted offset was 3188438 in range [1998348, 
>>>>>>>>>>>>>> 9223372036854775807),
>>>>>>>>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to