On the surface this looked ok, but running it on Dataflow ended up giving
me some new errors.
Essentially UnboundedSourceAsSDFRestrictionTracker.getProgress also uses
currentReader, but it creates new Readers (without checking the cache key).

@Boyuan Zhang <boyu...@google.com> I'm guessing this should be part of this
PR (https://github.com/apache/beam/pull/13592), do you know if there was
something behind this?

Thanks!

Cheers
Alex

On Thu, Jun 24, 2021 at 4:01 PM Alex Koay <alexkoa...@gmail.com> wrote:

> Good news for anybody who's following this.
> I finally had some time today to look into the problem again with some
> fresh eyes and I figured out the problems.
> As I suspected, the issue lies with the cachedReaders and how it uses
> CheckpointMarks.
>
> With the SDF cachedReaders, it serializes the Source and CheckpointMark
> (along with MIN_TIMESTAMP) to create a cache key for the reader so that it
> can be reused later. Sounds good so far.
>
> On the other hand, Solace doesn't make use of the CheckpointMark at all in
> Source.createReader(), opting instead to create a new Reader every time.
> This is because Solace doesn't really have a notion of checkpoints in
> their queue, you just get the next available message always.
> This makes sense to me.
> As a result, whenever createReader() is called, the CheckpointMark that
> comes as a result is always unique.
>
> The other point about Solace is that its Reader acknowledges the messages
> only upon finalization (which should be the case).
>
> So far, I'm re-iterating what I mentioned before. Well, here's the last
> part I has a hunch about but finally had time to confirm today.
>
> While Solace's cached readers do expire after a minute, while they're
> still running, the Solace server helpfully sends some messages (up to 255
> -- I could be wrong about this) to the reader first (which it then waits
> for an acknowledgement).
> Why this happens is because the FlowReceiver that Solace has isn't yet
> closed, the server treats the Reader as still being "active" for all
> intents and purposes.
> These messages, though, never get read at all by Beam, because
> Reader.advance() is never called, and as such it stays as such until the
> identical CheckpointMark is recalled (which it never does, because the
> CheckpointMark has moved on).
>
> In the meantime, the SDF spawns more and more readers over and over again
> (which will sooner or later go hungry) and all these will become cached
> (with some number of unacknowledged messages), because advance() was never
> called on it.
>
> Eventually, when cachedReaders hits 100 or after 60 seconds, the old
> Readers which have some unacknowledged 255 messages will then be closed,
> freeing up the messages to go to the other readers.
> But now here's the kicker, the other ~99 Readers (in each thread) in the
> cache also are active in the eyes of Solace!
> All of them will get some messages, which never get called on, because the
> CheckpointMark has moved on yet again.
> This goes on for eternity, leading to the very small trickle of messages
> coming in after that.
>
> I've currently fixed the problem by marking everything in Solace's
> CheckpointMark as transient, and as a result it will always reuse the
> cached Reader, but I'd like to discuss if there are any better ways to work
> around this.
> I would propose these two ideas as fallback options, especially
> considering existing UnboundedSources.
> 1. make Reader caching optional
> 2. simply always reuse the existing reader
>
> In any case, the problem (as it was) has been resolved.
>
> Cheers
> Alex
>
>
> On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik <lc...@google.com> wrote:
>
>> Yes I was referring to
>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>>  since
>> that is responsible for scheduling the bundle finalization. It will only be
>> invoked if the current bundle completes.
>>
>> I would add logging to ParDoEvaluator#finishBundle[1] to see that the
>> bundle is being completed.
>> I would add logging to EvaluationContext#handleResult[2] to see how the
>> bundle completion is being handled at the bundle finalization callback is
>> being invoked.
>>
>> 1:
>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267
>> 2:
>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157
>>
>> On Thu, Jun 17, 2021 at 10:42 AM Alex Koay <alexkoa...@gmail.com> wrote:
>>
>>> Could you be referring to this part?
>>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>>>
>>> I've tried fiddling with it and it gave some good results if I recall
>>> correctly.
>>> I didn't mention it earlier because I thought that maybe a shorter
>>> expiry actually causes the readers to expire faster and thus releasing the
>>> unacked messages for another reader to bundle up.
>>>
>>> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
>>> called for at least some time if the bundle size is not maxed (or close to
>>> maxed) out.
>>> I've added logging into finalizeCheckpoint() and don't see it getting
>>> called. It's where the acknowledgements are happening.
>>>
>>> I've actually opened a Google support ticket for this as well, perhaps
>>> you could take a look at it (Case 28209335).
>>> Thanks for your reply, I'll try to debug this further too.
>>>
>>> On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Reducing the bundle size/timeout shouldn't be necessary since when the
>>>> UnboundedSource returns false from advance(), the
>>>> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
>>>> return resume for the process continuation. This should cause
>>>> invokeProcessElement() to complete in
>>>> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific
>>>> implementation should finish the current bundle. This will allow the runner
>>>> to do two things:
>>>> 1) Finalize the current bundle
>>>> 2) Schedule the continuation for the checkpoint mark
>>>>
>>>> Based upon your description it looks like for some reason the runner is
>>>> unable to complete the current bundle.
>>>>
>>>> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay <alexkoa...@gmail.com> wrote:
>>>>
>>>>> Okay, I think I've found the issue, but now I need some help figuring
>>>>> out how to fix the issue.
>>>>>
>>>>> 1. Solace allows a number of unacknowledged messages before it stops
>>>>> sending more messages. This number just so happens to be 10k messages by
>>>>> default (in the queue I am using).
>>>>> 2. The Solace Beam transform (rightly) waits until bundle finalization
>>>>> before acknowledging the messages.
>>>>> 3. Bundle finalization doesn't happen until it either reaches 10k
>>>>> messages or 10s for the DataflowRunner. For the PortableRunner this seems
>>>>> to be 10k or an unknown timeout. This is related to the
>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker.
>>>>> 4. Many readers are created (over and over) due to the
>>>>> UnboundedSourceAsSdfWrapperFn.
>>>>> 5. When a lot of readers are created, they would compete for the
>>>>> messages (in non-exclusive mode), eventually leaving a small number of
>>>>> unacknowledged messages per bundle.
>>>>> 6. The readers are then cached in cachedReaders in the
>>>>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
>>>>> evicted after a minute. See https://github.com/apache/beam/pull/13592
>>>>> 7. The readers each have a small number of unacknowledged messages
>>>>> which will remain unacknowledged and cannot be given to another consumer
>>>>> until the bundle finalization happens.
>>>>> 8. When bundle finalization happens (possibly after the reader gets
>>>>> evicted), the messages return to the queue, only to get taken by the huge
>>>>> number of other competing readers.
>>>>>
>>>>> At this point, I'm guessing the few methods to fix this are:
>>>>> a. reduce the bundle size / reduce the bundle timeout (which all seem
>>>>> to be hardcoded per runner)
>>>>> b. reduce the number of cached readers / their timeouts (which doesn't
>>>>> seem to be customizable either) so that there would be less contention
>>>>> c. somehow reduce the splitting process and instead reusing existing
>>>>> sources over and over
>>>>>
>>>>> I'd be happy to send pull requests to help fix this issue but perhaps
>>>>> will need some direction as to how I should fix this.
>>>>>
>>>>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <alexkoa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Alright, some updates.
>>>>>>
>>>>>> Using DirectRunner helped narrow things down quite a bit. It seems
>>>>>> that the Solace transform is somewhat buggy when used with the
>>>>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
>>>>>> Refer to this:
>>>>>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40
>>>>>>
>>>>>> The source simply creates a new Reader every time createReader() is
>>>>>> called.
>>>>>>
>>>>>> Because of these, the cachedReaders in the
>>>>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
>>>>>> readers not being closed, but stay in the cache.
>>>>>> Changing the timeout causes the pipeline to continue draining but at
>>>>>> a glacial pace.
>>>>>>
>>>>>> I've still not able to isolate the root cause of why it suddenly
>>>>>> stops reading more data (could be a Solace issue though).
>>>>>>
>>>>>>
>>>>>> Also, trying the easy way out, I've tried running it with 2.24.0 (the
>>>>>> last one without the SDF default Read) in Java and it works perfectly.
>>>>>> Newer versions in Java DirectRunner don't work correctly either.
>>>>>> Unfortunately Dataflow seems to expand the external transform using
>>>>>> the SDF Read version even when using 2.24.0 (I'm not entirely sure why 
>>>>>> this
>>>>>> is the case).
>>>>>>
>>>>>> I feel like I'm almost at the verge of fixing the problem, but at
>>>>>> this point I'm still far from it.
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <alexkoa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> 1. I'm building a streaming pipeline.
>>>>>>> 2. For the pure Java transforms pipeline I believe it got
>>>>>>> substituted with a Dataflow native Solace transform (it isn't using
>>>>>>> use_runner_v2 as I think Java doesn't support that publicly yet). I used
>>>>>>> the default Java flags with a DataflowRunner.
>>>>>>> 3. I believe it's the source reader that is being created in mass.
>>>>>>>
>>>>>>> Currently I just tested the Python pipeline (with Java Solace
>>>>>>> transform) on the DirectRunner without bounds, and it seems that the 
>>>>>>> issue
>>>>>>> is similarly manifesting. I'm trying to debug it this way for now.
>>>>>>>
>>>>>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <boyu...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In terms of the odd case you are experiencing,  it seems like you
>>>>>>>> are comparing a pure java pipeline with a cross-language pipeline, 
>>>>>>>> right? I
>>>>>>>> want to learn more details on this case:
>>>>>>>>
>>>>>>>>    - Is this a batch pipeline or a streaming pipeline?
>>>>>>>>    - For your pure java transforms pipeline, do you run the
>>>>>>>>    pipeline with 'use_runner_v2' or 'beam_fn_api' or 
>>>>>>>> 'use_unified_worker'?
>>>>>>>>    - For a large number of consumers, do you mean dataflow workers
>>>>>>>>    or the source reader?
>>>>>>>>
>>>>>>>> If you can share the implementation of the source and the pipeline,
>>>>>>>> that would be really helpful.
>>>>>>>>
>>>>>>>> +Lukasz Cwik <lc...@google.com> for awareness.
>>>>>>>>
>>>>>>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <alexkoa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Several questions:
>>>>>>>>>>
>>>>>>>>>> 1. Is there any way to set the log level for the Java workers via
>>>>>>>>>> a Python Dataflow pipeline?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> 2. What is the easiest way to debug an external transform in
>>>>>>>>>> Java? My main pipeline code is in Python.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In general, debugging a job should be similar to any other
>>>>>>>>> Dataflow job [1]. But some of the SDK options available to the main 
>>>>>>>>> SDK
>>>>>>>>> environment are currently not available to external SDK environments. 
>>>>>>>>> This
>>>>>>>>> includes changing the debug level. So I suggest adding INFO logs 
>>>>>>>>> instead of
>>>>>>>>> changing the debug level if possible.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF
>>>>>>>>>> that I should be wary of? I'm currently encountering a odd case (in
>>>>>>>>>> Dataflow) where a Java pipeline runs with only one worker all the way
>>>>>>>>>> reading Solace messages, but with an external transform in Python, it
>>>>>>>>>> generates a large number of consumers and stop reading messages 
>>>>>>>>>> altogether
>>>>>>>>>> about 90% of the way.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> +Boyuan Zhang <boyu...@google.com> might be able to help.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>> Alex
>>>>>>>>>>
>>>>>>>>>

Reply via email to