Could you be referring to this part?

I've tried fiddling with it and it gave some good results if I recall
I didn't mention it earlier because I thought that maybe a shorter expiry
actually causes the readers to expire faster and thus releasing the unacked
messages for another reader to bundle up.

I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
called for at least some time if the bundle size is not maxed (or close to
maxed) out.
I've added logging into finalizeCheckpoint() and don't see it getting
called. It's where the acknowledgements are happening.

I've actually opened a Google support ticket for this as well, perhaps you
could take a look at it (Case 28209335).
Thanks for your reply, I'll try to debug this further too.

On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik <> wrote:

> Reducing the bundle size/timeout shouldn't be necessary since when the
> UnboundedSource returns false from advance(), the
> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
> return resume for the process continuation. This should cause
> invokeProcessElement() to complete in
> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific
> implementation should finish the current bundle. This will allow the runner
> to do two things:
> 1) Finalize the current bundle
> 2) Schedule the continuation for the checkpoint mark
> Based upon your description it looks like for some reason the runner is
> unable to complete the current bundle.
> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay <> wrote:
>> Okay, I think I've found the issue, but now I need some help figuring out
>> how to fix the issue.
>> 1. Solace allows a number of unacknowledged messages before it stops
>> sending more messages. This number just so happens to be 10k messages by
>> default (in the queue I am using).
>> 2. The Solace Beam transform (rightly) waits until bundle finalization
>> before acknowledging the messages.
>> 3. Bundle finalization doesn't happen until it either reaches 10k
>> messages or 10s for the DataflowRunner. For the PortableRunner this seems
>> to be 10k or an unknown timeout. This is related to the
>> OutputAndTimeBoundedSplittableProcessElementInvoker.
>> 4. Many readers are created (over and over) due to the
>> UnboundedSourceAsSdfWrapperFn.
>> 5. When a lot of readers are created, they would compete for the messages
>> (in non-exclusive mode), eventually leaving a small number of
>> unacknowledged messages per bundle.
>> 6. The readers are then cached in cachedReaders in the
>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
>> evicted after a minute. See
>> 7. The readers each have a small number of unacknowledged messages which
>> will remain unacknowledged and cannot be given to another consumer until
>> the bundle finalization happens.
>> 8. When bundle finalization happens (possibly after the reader gets
>> evicted), the messages return to the queue, only to get taken by the huge
>> number of other competing readers.
>> At this point, I'm guessing the few methods to fix this are:
>> a. reduce the bundle size / reduce the bundle timeout (which all seem to
>> be hardcoded per runner)
>> b. reduce the number of cached readers / their timeouts (which doesn't
>> seem to be customizable either) so that there would be less contention
>> c. somehow reduce the splitting process and instead reusing existing
>> sources over and over
>> I'd be happy to send pull requests to help fix this issue but perhaps
>> will need some direction as to how I should fix this.
>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <> wrote:
>>> Alright, some updates.
>>> Using DirectRunner helped narrow things down quite a bit. It seems that
>>> the Solace transform is somewhat buggy when used with the
>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
>>> Refer to this:
>>> The source simply creates a new Reader every time createReader() is
>>> called.
>>> Because of these, the cachedReaders in the
>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
>>> readers not being closed, but stay in the cache.
>>> Changing the timeout causes the pipeline to continue draining but at a
>>> glacial pace.
>>> I've still not able to isolate the root cause of why it suddenly stops
>>> reading more data (could be a Solace issue though).
>>> Also, trying the easy way out, I've tried running it with 2.24.0 (the
>>> last one without the SDF default Read) in Java and it works perfectly.
>>> Newer versions in Java DirectRunner don't work correctly either.
>>> Unfortunately Dataflow seems to expand the external transform using the
>>> SDF Read version even when using 2.24.0 (I'm not entirely sure why this is
>>> the case).
>>> I feel like I'm almost at the verge of fixing the problem, but at this
>>> point I'm still far from it.
>>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <> wrote:
>>>> 1. I'm building a streaming pipeline.
>>>> 2. For the pure Java transforms pipeline I believe it got substituted
>>>> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I
>>>> think Java doesn't support that publicly yet). I used the default Java
>>>> flags with a DataflowRunner.
>>>> 3. I believe it's the source reader that is being created in mass.
>>>> Currently I just tested the Python pipeline (with Java Solace
>>>> transform) on the DirectRunner without bounds, and it seems that the issue
>>>> is similarly manifesting. I'm trying to debug it this way for now.
>>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <>
>>>> wrote:
>>>>> In terms of the odd case you are experiencing,  it seems like you are
>>>>> comparing a pure java pipeline with a cross-language pipeline, right? I
>>>>> want to learn more details on this case:
>>>>>    - Is this a batch pipeline or a streaming pipeline?
>>>>>    - For your pure java transforms pipeline, do you run the pipeline
>>>>>    with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'?
>>>>>    - For a large number of consumers, do you mean dataflow workers or
>>>>>    the source reader?
>>>>> If you can share the implementation of the source and the pipeline,
>>>>> that would be really helpful.
>>>>> +Lukasz Cwik <> for awareness.
>>>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath <
>>>>>> wrote:
>>>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <>
>>>>>> wrote:
>>>>>>> Several questions:
>>>>>>> 1. Is there any way to set the log level for the Java workers via a
>>>>>>> Python Dataflow pipeline?
>>>>>>> 2. What is the easiest way to debug an external transform in Java?
>>>>>>> My main pipeline code is in Python.
>>>>>> In general, debugging a job should be similar to any other Dataflow
>>>>>> job [1]. But some of the SDK options available to the main SDK 
>>>>>> environment
>>>>>> are currently not available to external SDK environments. This includes
>>>>>> changing the debug level. So I suggest adding INFO logs instead of 
>>>>>> changing
>>>>>> the debug level if possible.
>>>>>> [1]
>>>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF
>>>>>>> that I should be wary of? I'm currently encountering a odd case (in
>>>>>>> Dataflow) where a Java pipeline runs with only one worker all the way
>>>>>>> reading Solace messages, but with an external transform in Python, it
>>>>>>> generates a large number of consumers and stop reading messages 
>>>>>>> altogether
>>>>>>> about 90% of the way.
>>>>>> +Boyuan Zhang <> might be able to help.
>>>>>>> Thanks!
>>>>>>> Cheers
>>>>>>> Alex

Reply via email to