Okay, I think I've found the issue, but now I need some help figuring out
how to fix the issue.

1. Solace allows a number of unacknowledged messages before it stops
sending more messages. This number just so happens to be 10k messages by
default (in the queue I am using).
2. The Solace Beam transform (rightly) waits until bundle finalization
before acknowledging the messages.
3. Bundle finalization doesn't happen until it either reaches 10k messages
or 10s for the DataflowRunner. For the PortableRunner this seems to be 10k
or an unknown timeout. This is related to the
OutputAndTimeBoundedSplittableProcessElementInvoker.
4. Many readers are created (over and over) due to the
UnboundedSourceAsSdfWrapperFn.
5. When a lot of readers are created, they would compete for the messages
(in non-exclusive mode), eventually leaving a small number of
unacknowledged messages per bundle.
6. The readers are then cached in cachedReaders in the
UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
evicted after a minute. See https://github.com/apache/beam/pull/13592
7. The readers each have a small number of unacknowledged messages which
will remain unacknowledged and cannot be given to another consumer until
the bundle finalization happens.
8. When bundle finalization happens (possibly after the reader gets
evicted), the messages return to the queue, only to get taken by the huge
number of other competing readers.

At this point, I'm guessing the few methods to fix this are:
a. reduce the bundle size / reduce the bundle timeout (which all seem to be
hardcoded per runner)
b. reduce the number of cached readers / their timeouts (which doesn't seem
to be customizable either) so that there would be less contention
c. somehow reduce the splitting process and instead reusing existing
sources over and over

I'd be happy to send pull requests to help fix this issue but perhaps will
need some direction as to how I should fix this.

On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <[email protected]> wrote:

> Alright, some updates.
>
> Using DirectRunner helped narrow things down quite a bit. It seems that
> the Solace transform is somewhat buggy when used with the
> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
> Refer to this:
> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40
>
> The source simply creates a new Reader every time createReader() is called.
>
> Because of these, the cachedReaders in the
> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
> readers not being closed, but stay in the cache.
> Changing the timeout causes the pipeline to continue draining but at a
> glacial pace.
>
> I've still not able to isolate the root cause of why it suddenly stops
> reading more data (could be a Solace issue though).
>
>
> Also, trying the easy way out, I've tried running it with 2.24.0 (the last
> one without the SDF default Read) in Java and it works perfectly. Newer
> versions in Java DirectRunner don't work correctly either.
> Unfortunately Dataflow seems to expand the external transform using the
> SDF Read version even when using 2.24.0 (I'm not entirely sure why this is
> the case).
>
> I feel like I'm almost at the verge of fixing the problem, but at this
> point I'm still far from it.
>
>
> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <[email protected]> wrote:
>
>> 1. I'm building a streaming pipeline.
>> 2. For the pure Java transforms pipeline I believe it got substituted
>> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I
>> think Java doesn't support that publicly yet). I used the default Java
>> flags with a DataflowRunner.
>> 3. I believe it's the source reader that is being created in mass.
>>
>> Currently I just tested the Python pipeline (with Java Solace transform)
>> on the DirectRunner without bounds, and it seems that the issue is
>> similarly manifesting. I'm trying to debug it this way for now.
>>
>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <[email protected]> wrote:
>>
>>> In terms of the odd case you are experiencing,  it seems like you are
>>> comparing a pure java pipeline with a cross-language pipeline, right? I
>>> want to learn more details on this case:
>>>
>>>    - Is this a batch pipeline or a streaming pipeline?
>>>    - For your pure java transforms pipeline, do you run the pipeline
>>>    with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'?
>>>    - For a large number of consumers, do you mean dataflow workers or
>>>    the source reader?
>>>
>>> If you can share the implementation of the source and the pipeline, that
>>> would be really helpful.
>>>
>>> +Lukasz Cwik <[email protected]> for awareness.
>>>
>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath <[email protected]>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <[email protected]> wrote:
>>>>
>>>>> Several questions:
>>>>>
>>>>> 1. Is there any way to set the log level for the Java workers via a
>>>>> Python Dataflow pipeline?
>>>>>
>>>>
>>>>> 2. What is the easiest way to debug an external transform in Java? My
>>>>> main pipeline code is in Python.
>>>>>
>>>>
>>>> In general, debugging a job should be similar to any other Dataflow job
>>>> [1]. But some of the SDK options available to the main SDK environment are
>>>> currently not available to external SDK environments. This includes
>>>> changing the debug level. So I suggest adding INFO logs instead of changing
>>>> the debug level if possible.
>>>>
>>>> [1]
>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>
>>>>
>>>>>
>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF that
>>>>> I should be wary of? I'm currently encountering a odd case (in Dataflow)
>>>>> where a Java pipeline runs with only one worker all the way reading Solace
>>>>> messages, but with an external transform in Python, it generates a large
>>>>> number of consumers and stop reading messages altogether about 90% of the
>>>>> way.
>>>>>
>>>>
>>>> +Boyuan Zhang <[email protected]> might be able to help.
>>>>
>>>>
>>>>> Thanks!
>>>>>
>>>>> Cheers
>>>>> Alex
>>>>>
>>>>

Reply via email to