It makes sense to use the reader cache instead of instantiating within
getProgress each time.

Feel free to create a JIRA and/or open a PR and send it out for review.

On Thu, Jun 24, 2021 at 5:43 AM Alex Koay <alexkoa...@gmail.com> wrote:

> On the surface this looked ok, but running it on Dataflow ended up giving
> me some new errors.
> Essentially UnboundedSourceAsSDFRestrictionTracker.getProgress also uses
> currentReader, but it creates new Readers (without checking the cache key).
>
> @Boyuan Zhang <boyu...@google.com> I'm guessing this should be part of
> this PR (https://github.com/apache/beam/pull/13592), do you know if there
> was something behind this?
>
> Thanks!
>
> Cheers
> Alex
>
> On Thu, Jun 24, 2021 at 4:01 PM Alex Koay <alexkoa...@gmail.com> wrote:
>
>> Good news for anybody who's following this.
>> I finally had some time today to look into the problem again with some
>> fresh eyes and I figured out the problems.
>> As I suspected, the issue lies with the cachedReaders and how it uses
>> CheckpointMarks.
>>
>> With the SDF cachedReaders, it serializes the Source and CheckpointMark
>> (along with MIN_TIMESTAMP) to create a cache key for the reader so that it
>> can be reused later. Sounds good so far.
>>
>> On the other hand, Solace doesn't make use of the CheckpointMark at all
>> in Source.createReader(), opting instead to create a new Reader every time.
>> This is because Solace doesn't really have a notion of checkpoints in
>> their queue, you just get the next available message always.
>> This makes sense to me.
>> As a result, whenever createReader() is called, the CheckpointMark that
>> comes as a result is always unique.
>>
>> The other point about Solace is that its Reader acknowledges the messages
>> only upon finalization (which should be the case).
>>
>> So far, I'm re-iterating what I mentioned before. Well, here's the last
>> part I has a hunch about but finally had time to confirm today.
>>
>> While Solace's cached readers do expire after a minute, while they're
>> still running, the Solace server helpfully sends some messages (up to 255
>> -- I could be wrong about this) to the reader first (which it then waits
>> for an acknowledgement).
>> Why this happens is because the FlowReceiver that Solace has isn't yet
>> closed, the server treats the Reader as still being "active" for all
>> intents and purposes.
>> These messages, though, never get read at all by Beam, because
>> Reader.advance() is never called, and as such it stays as such until the
>> identical CheckpointMark is recalled (which it never does, because the
>> CheckpointMark has moved on).
>>
>> In the meantime, the SDF spawns more and more readers over and over again
>> (which will sooner or later go hungry) and all these will become cached
>> (with some number of unacknowledged messages), because advance() was never
>> called on it.
>>
>> Eventually, when cachedReaders hits 100 or after 60 seconds, the old
>> Readers which have some unacknowledged 255 messages will then be closed,
>> freeing up the messages to go to the other readers.
>> But now here's the kicker, the other ~99 Readers (in each thread) in the
>> cache also are active in the eyes of Solace!
>> All of them will get some messages, which never get called on, because
>> the CheckpointMark has moved on yet again.
>> This goes on for eternity, leading to the very small trickle of messages
>> coming in after that.
>>
>> I've currently fixed the problem by marking everything in Solace's
>> CheckpointMark as transient, and as a result it will always reuse the
>> cached Reader, but I'd like to discuss if there are any better ways to work
>> around this.
>> I would propose these two ideas as fallback options, especially
>> considering existing UnboundedSources.
>> 1. make Reader caching optional
>> 2. simply always reuse the existing reader
>>
>> In any case, the problem (as it was) has been resolved.
>>
>> Cheers
>> Alex
>>
>>
>> On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Yes I was referring to
>>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>>>  since
>>> that is responsible for scheduling the bundle finalization. It will only be
>>> invoked if the current bundle completes.
>>>
>>> I would add logging to ParDoEvaluator#finishBundle[1] to see that the
>>> bundle is being completed.
>>> I would add logging to EvaluationContext#handleResult[2] to see how the
>>> bundle completion is being handled at the bundle finalization callback is
>>> being invoked.
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267
>>> 2:
>>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157
>>>
>>> On Thu, Jun 17, 2021 at 10:42 AM Alex Koay <alexkoa...@gmail.com> wrote:
>>>
>>>> Could you be referring to this part?
>>>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>>>>
>>>> I've tried fiddling with it and it gave some good results if I recall
>>>> correctly.
>>>> I didn't mention it earlier because I thought that maybe a shorter
>>>> expiry actually causes the readers to expire faster and thus releasing the
>>>> unacked messages for another reader to bundle up.
>>>>
>>>> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
>>>> called for at least some time if the bundle size is not maxed (or close to
>>>> maxed) out.
>>>> I've added logging into finalizeCheckpoint() and don't see it getting
>>>> called. It's where the acknowledgements are happening.
>>>>
>>>> I've actually opened a Google support ticket for this as well, perhaps
>>>> you could take a look at it (Case 28209335).
>>>> Thanks for your reply, I'll try to debug this further too.
>>>>
>>>> On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Reducing the bundle size/timeout shouldn't be necessary since when the
>>>>> UnboundedSource returns false from advance(), the
>>>>> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
>>>>> return resume for the process continuation. This should cause
>>>>> invokeProcessElement() to complete in
>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner 
>>>>> specific
>>>>> implementation should finish the current bundle. This will allow the 
>>>>> runner
>>>>> to do two things:
>>>>> 1) Finalize the current bundle
>>>>> 2) Schedule the continuation for the checkpoint mark
>>>>>
>>>>> Based upon your description it looks like for some reason the runner
>>>>> is unable to complete the current bundle.
>>>>>
>>>>> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay <alexkoa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Okay, I think I've found the issue, but now I need some help figuring
>>>>>> out how to fix the issue.
>>>>>>
>>>>>> 1. Solace allows a number of unacknowledged messages before it stops
>>>>>> sending more messages. This number just so happens to be 10k messages by
>>>>>> default (in the queue I am using).
>>>>>> 2. The Solace Beam transform (rightly) waits until bundle
>>>>>> finalization before acknowledging the messages.
>>>>>> 3. Bundle finalization doesn't happen until it either reaches 10k
>>>>>> messages or 10s for the DataflowRunner. For the PortableRunner this seems
>>>>>> to be 10k or an unknown timeout. This is related to the
>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker.
>>>>>> 4. Many readers are created (over and over) due to the
>>>>>> UnboundedSourceAsSdfWrapperFn.
>>>>>> 5. When a lot of readers are created, they would compete for the
>>>>>> messages (in non-exclusive mode), eventually leaving a small number of
>>>>>> unacknowledged messages per bundle.
>>>>>> 6. The readers are then cached in cachedReaders in the
>>>>>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
>>>>>> evicted after a minute. See https://github.com/apache/beam/pull/13592
>>>>>> 7. The readers each have a small number of unacknowledged messages
>>>>>> which will remain unacknowledged and cannot be given to another consumer
>>>>>> until the bundle finalization happens.
>>>>>> 8. When bundle finalization happens (possibly after the reader gets
>>>>>> evicted), the messages return to the queue, only to get taken by the huge
>>>>>> number of other competing readers.
>>>>>>
>>>>>> At this point, I'm guessing the few methods to fix this are:
>>>>>> a. reduce the bundle size / reduce the bundle timeout (which all seem
>>>>>> to be hardcoded per runner)
>>>>>> b. reduce the number of cached readers / their timeouts (which
>>>>>> doesn't seem to be customizable either) so that there would be less
>>>>>> contention
>>>>>> c. somehow reduce the splitting process and instead reusing existing
>>>>>> sources over and over
>>>>>>
>>>>>> I'd be happy to send pull requests to help fix this issue but perhaps
>>>>>> will need some direction as to how I should fix this.
>>>>>>
>>>>>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <alexkoa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Alright, some updates.
>>>>>>>
>>>>>>> Using DirectRunner helped narrow things down quite a bit. It seems
>>>>>>> that the Solace transform is somewhat buggy when used with the
>>>>>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper 
>>>>>>> CheckpointMark.
>>>>>>> Refer to this:
>>>>>>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40
>>>>>>>
>>>>>>> The source simply creates a new Reader every time createReader() is
>>>>>>> called.
>>>>>>>
>>>>>>> Because of these, the cachedReaders in the
>>>>>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
>>>>>>> readers not being closed, but stay in the cache.
>>>>>>> Changing the timeout causes the pipeline to continue draining but at
>>>>>>> a glacial pace.
>>>>>>>
>>>>>>> I've still not able to isolate the root cause of why it suddenly
>>>>>>> stops reading more data (could be a Solace issue though).
>>>>>>>
>>>>>>>
>>>>>>> Also, trying the easy way out, I've tried running it with 2.24.0
>>>>>>> (the last one without the SDF default Read) in Java and it works 
>>>>>>> perfectly.
>>>>>>> Newer versions in Java DirectRunner don't work correctly either.
>>>>>>> Unfortunately Dataflow seems to expand the external transform using
>>>>>>> the SDF Read version even when using 2.24.0 (I'm not entirely sure why 
>>>>>>> this
>>>>>>> is the case).
>>>>>>>
>>>>>>> I feel like I'm almost at the verge of fixing the problem, but at
>>>>>>> this point I'm still far from it.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <alexkoa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> 1. I'm building a streaming pipeline.
>>>>>>>> 2. For the pure Java transforms pipeline I believe it got
>>>>>>>> substituted with a Dataflow native Solace transform (it isn't using
>>>>>>>> use_runner_v2 as I think Java doesn't support that publicly yet). I 
>>>>>>>> used
>>>>>>>> the default Java flags with a DataflowRunner.
>>>>>>>> 3. I believe it's the source reader that is being created in mass.
>>>>>>>>
>>>>>>>> Currently I just tested the Python pipeline (with Java Solace
>>>>>>>> transform) on the DirectRunner without bounds, and it seems that the 
>>>>>>>> issue
>>>>>>>> is similarly manifesting. I'm trying to debug it this way for now.
>>>>>>>>
>>>>>>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <boyu...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In terms of the odd case you are experiencing,  it seems like you
>>>>>>>>> are comparing a pure java pipeline with a cross-language pipeline, 
>>>>>>>>> right? I
>>>>>>>>> want to learn more details on this case:
>>>>>>>>>
>>>>>>>>>    - Is this a batch pipeline or a streaming pipeline?
>>>>>>>>>    - For your pure java transforms pipeline, do you run the
>>>>>>>>>    pipeline with 'use_runner_v2' or 'beam_fn_api' or 
>>>>>>>>> 'use_unified_worker'?
>>>>>>>>>    - For a large number of consumers, do you mean dataflow
>>>>>>>>>    workers or the source reader?
>>>>>>>>>
>>>>>>>>> If you can share the implementation of the source and the
>>>>>>>>> pipeline, that would be really helpful.
>>>>>>>>>
>>>>>>>>> +Lukasz Cwik <lc...@google.com> for awareness.
>>>>>>>>>
>>>>>>>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath <
>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <alexkoa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Several questions:
>>>>>>>>>>>
>>>>>>>>>>> 1. Is there any way to set the log level for the Java workers
>>>>>>>>>>> via a Python Dataflow pipeline?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> 2. What is the easiest way to debug an external transform in
>>>>>>>>>>> Java? My main pipeline code is in Python.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> In general, debugging a job should be similar to any other
>>>>>>>>>> Dataflow job [1]. But some of the SDK options available to the main 
>>>>>>>>>> SDK
>>>>>>>>>> environment are currently not available to external SDK 
>>>>>>>>>> environments. This
>>>>>>>>>> includes changing the debug level. So I suggest adding INFO logs 
>>>>>>>>>> instead of
>>>>>>>>>> changing the debug level if possible.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn
>>>>>>>>>>> SDF that I should be wary of? I'm currently encountering a odd case 
>>>>>>>>>>> (in
>>>>>>>>>>> Dataflow) where a Java pipeline runs with only one worker all the 
>>>>>>>>>>> way
>>>>>>>>>>> reading Solace messages, but with an external transform in Python, 
>>>>>>>>>>> it
>>>>>>>>>>> generates a large number of consumers and stop reading messages 
>>>>>>>>>>> altogether
>>>>>>>>>>> about 90% of the way.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> +Boyuan Zhang <boyu...@google.com> might be able to help.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>> Cheers
>>>>>>>>>>> Alex
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to