It makes sense to use the reader cache instead of instantiating within getProgress each time.
Feel free to create a JIRA and/or open a PR and send it out for review. On Thu, Jun 24, 2021 at 5:43 AM Alex Koay <alexkoa...@gmail.com> wrote: > On the surface this looked ok, but running it on Dataflow ended up giving > me some new errors. > Essentially UnboundedSourceAsSDFRestrictionTracker.getProgress also uses > currentReader, but it creates new Readers (without checking the cache key). > > @Boyuan Zhang <boyu...@google.com> I'm guessing this should be part of > this PR (https://github.com/apache/beam/pull/13592), do you know if there > was something behind this? > > Thanks! > > Cheers > Alex > > On Thu, Jun 24, 2021 at 4:01 PM Alex Koay <alexkoa...@gmail.com> wrote: > >> Good news for anybody who's following this. >> I finally had some time today to look into the problem again with some >> fresh eyes and I figured out the problems. >> As I suspected, the issue lies with the cachedReaders and how it uses >> CheckpointMarks. >> >> With the SDF cachedReaders, it serializes the Source and CheckpointMark >> (along with MIN_TIMESTAMP) to create a cache key for the reader so that it >> can be reused later. Sounds good so far. >> >> On the other hand, Solace doesn't make use of the CheckpointMark at all >> in Source.createReader(), opting instead to create a new Reader every time. >> This is because Solace doesn't really have a notion of checkpoints in >> their queue, you just get the next available message always. >> This makes sense to me. >> As a result, whenever createReader() is called, the CheckpointMark that >> comes as a result is always unique. >> >> The other point about Solace is that its Reader acknowledges the messages >> only upon finalization (which should be the case). >> >> So far, I'm re-iterating what I mentioned before. Well, here's the last >> part I has a hunch about but finally had time to confirm today. >> >> While Solace's cached readers do expire after a minute, while they're >> still running, the Solace server helpfully sends some messages (up to 255 >> -- I could be wrong about this) to the reader first (which it then waits >> for an acknowledgement). >> Why this happens is because the FlowReceiver that Solace has isn't yet >> closed, the server treats the Reader as still being "active" for all >> intents and purposes. >> These messages, though, never get read at all by Beam, because >> Reader.advance() is never called, and as such it stays as such until the >> identical CheckpointMark is recalled (which it never does, because the >> CheckpointMark has moved on). >> >> In the meantime, the SDF spawns more and more readers over and over again >> (which will sooner or later go hungry) and all these will become cached >> (with some number of unacknowledged messages), because advance() was never >> called on it. >> >> Eventually, when cachedReaders hits 100 or after 60 seconds, the old >> Readers which have some unacknowledged 255 messages will then be closed, >> freeing up the messages to go to the other readers. >> But now here's the kicker, the other ~99 Readers (in each thread) in the >> cache also are active in the eyes of Solace! >> All of them will get some messages, which never get called on, because >> the CheckpointMark has moved on yet again. >> This goes on for eternity, leading to the very small trickle of messages >> coming in after that. >> >> I've currently fixed the problem by marking everything in Solace's >> CheckpointMark as transient, and as a result it will always reuse the >> cached Reader, but I'd like to discuss if there are any better ways to work >> around this. >> I would propose these two ideas as fallback options, especially >> considering existing UnboundedSources. >> 1. make Reader caching optional >> 2. simply always reuse the existing reader >> >> In any case, the problem (as it was) has been resolved. >> >> Cheers >> Alex >> >> >> On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik <lc...@google.com> wrote: >> >>> Yes I was referring to >>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 >>> since >>> that is responsible for scheduling the bundle finalization. It will only be >>> invoked if the current bundle completes. >>> >>> I would add logging to ParDoEvaluator#finishBundle[1] to see that the >>> bundle is being completed. >>> I would add logging to EvaluationContext#handleResult[2] to see how the >>> bundle completion is being handled at the bundle finalization callback is >>> being invoked. >>> >>> 1: >>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267 >>> 2: >>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157 >>> >>> On Thu, Jun 17, 2021 at 10:42 AM Alex Koay <alexkoa...@gmail.com> wrote: >>> >>>> Could you be referring to this part? >>>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 >>>> >>>> I've tried fiddling with it and it gave some good results if I recall >>>> correctly. >>>> I didn't mention it earlier because I thought that maybe a shorter >>>> expiry actually causes the readers to expire faster and thus releasing the >>>> unacked messages for another reader to bundle up. >>>> >>>> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get >>>> called for at least some time if the bundle size is not maxed (or close to >>>> maxed) out. >>>> I've added logging into finalizeCheckpoint() and don't see it getting >>>> called. It's where the acknowledgements are happening. >>>> >>>> I've actually opened a Google support ticket for this as well, perhaps >>>> you could take a look at it (Case 28209335). >>>> Thanks for your reply, I'll try to debug this further too. >>>> >>>> On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik <lc...@google.com> wrote: >>>> >>>>> Reducing the bundle size/timeout shouldn't be necessary since when the >>>>> UnboundedSource returns false from advance(), the >>>>> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and >>>>> return resume for the process continuation. This should cause >>>>> invokeProcessElement() to complete in >>>>> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner >>>>> specific >>>>> implementation should finish the current bundle. This will allow the >>>>> runner >>>>> to do two things: >>>>> 1) Finalize the current bundle >>>>> 2) Schedule the continuation for the checkpoint mark >>>>> >>>>> Based upon your description it looks like for some reason the runner >>>>> is unable to complete the current bundle. >>>>> >>>>> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay <alexkoa...@gmail.com> >>>>> wrote: >>>>> >>>>>> Okay, I think I've found the issue, but now I need some help figuring >>>>>> out how to fix the issue. >>>>>> >>>>>> 1. Solace allows a number of unacknowledged messages before it stops >>>>>> sending more messages. This number just so happens to be 10k messages by >>>>>> default (in the queue I am using). >>>>>> 2. The Solace Beam transform (rightly) waits until bundle >>>>>> finalization before acknowledging the messages. >>>>>> 3. Bundle finalization doesn't happen until it either reaches 10k >>>>>> messages or 10s for the DataflowRunner. For the PortableRunner this seems >>>>>> to be 10k or an unknown timeout. This is related to the >>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker. >>>>>> 4. Many readers are created (over and over) due to the >>>>>> UnboundedSourceAsSdfWrapperFn. >>>>>> 5. When a lot of readers are created, they would compete for the >>>>>> messages (in non-exclusive mode), eventually leaving a small number of >>>>>> unacknowledged messages per bundle. >>>>>> 6. The readers are then cached in cachedReaders in the >>>>>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get >>>>>> evicted after a minute. See https://github.com/apache/beam/pull/13592 >>>>>> 7. The readers each have a small number of unacknowledged messages >>>>>> which will remain unacknowledged and cannot be given to another consumer >>>>>> until the bundle finalization happens. >>>>>> 8. When bundle finalization happens (possibly after the reader gets >>>>>> evicted), the messages return to the queue, only to get taken by the huge >>>>>> number of other competing readers. >>>>>> >>>>>> At this point, I'm guessing the few methods to fix this are: >>>>>> a. reduce the bundle size / reduce the bundle timeout (which all seem >>>>>> to be hardcoded per runner) >>>>>> b. reduce the number of cached readers / their timeouts (which >>>>>> doesn't seem to be customizable either) so that there would be less >>>>>> contention >>>>>> c. somehow reduce the splitting process and instead reusing existing >>>>>> sources over and over >>>>>> >>>>>> I'd be happy to send pull requests to help fix this issue but perhaps >>>>>> will need some direction as to how I should fix this. >>>>>> >>>>>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <alexkoa...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Alright, some updates. >>>>>>> >>>>>>> Using DirectRunner helped narrow things down quite a bit. It seems >>>>>>> that the Solace transform is somewhat buggy when used with the >>>>>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper >>>>>>> CheckpointMark. >>>>>>> Refer to this: >>>>>>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40 >>>>>>> >>>>>>> The source simply creates a new Reader every time createReader() is >>>>>>> called. >>>>>>> >>>>>>> Because of these, the cachedReaders in the >>>>>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in >>>>>>> readers not being closed, but stay in the cache. >>>>>>> Changing the timeout causes the pipeline to continue draining but at >>>>>>> a glacial pace. >>>>>>> >>>>>>> I've still not able to isolate the root cause of why it suddenly >>>>>>> stops reading more data (could be a Solace issue though). >>>>>>> >>>>>>> >>>>>>> Also, trying the easy way out, I've tried running it with 2.24.0 >>>>>>> (the last one without the SDF default Read) in Java and it works >>>>>>> perfectly. >>>>>>> Newer versions in Java DirectRunner don't work correctly either. >>>>>>> Unfortunately Dataflow seems to expand the external transform using >>>>>>> the SDF Read version even when using 2.24.0 (I'm not entirely sure why >>>>>>> this >>>>>>> is the case). >>>>>>> >>>>>>> I feel like I'm almost at the verge of fixing the problem, but at >>>>>>> this point I'm still far from it. >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <alexkoa...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> 1. I'm building a streaming pipeline. >>>>>>>> 2. For the pure Java transforms pipeline I believe it got >>>>>>>> substituted with a Dataflow native Solace transform (it isn't using >>>>>>>> use_runner_v2 as I think Java doesn't support that publicly yet). I >>>>>>>> used >>>>>>>> the default Java flags with a DataflowRunner. >>>>>>>> 3. I believe it's the source reader that is being created in mass. >>>>>>>> >>>>>>>> Currently I just tested the Python pipeline (with Java Solace >>>>>>>> transform) on the DirectRunner without bounds, and it seems that the >>>>>>>> issue >>>>>>>> is similarly manifesting. I'm trying to debug it this way for now. >>>>>>>> >>>>>>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <boyu...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> In terms of the odd case you are experiencing, it seems like you >>>>>>>>> are comparing a pure java pipeline with a cross-language pipeline, >>>>>>>>> right? I >>>>>>>>> want to learn more details on this case: >>>>>>>>> >>>>>>>>> - Is this a batch pipeline or a streaming pipeline? >>>>>>>>> - For your pure java transforms pipeline, do you run the >>>>>>>>> pipeline with 'use_runner_v2' or 'beam_fn_api' or >>>>>>>>> 'use_unified_worker'? >>>>>>>>> - For a large number of consumers, do you mean dataflow >>>>>>>>> workers or the source reader? >>>>>>>>> >>>>>>>>> If you can share the implementation of the source and the >>>>>>>>> pipeline, that would be really helpful. >>>>>>>>> >>>>>>>>> +Lukasz Cwik <lc...@google.com> for awareness. >>>>>>>>> >>>>>>>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath < >>>>>>>>> chamik...@google.com> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <alexkoa...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Several questions: >>>>>>>>>>> >>>>>>>>>>> 1. Is there any way to set the log level for the Java workers >>>>>>>>>>> via a Python Dataflow pipeline? >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> 2. What is the easiest way to debug an external transform in >>>>>>>>>>> Java? My main pipeline code is in Python. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> In general, debugging a job should be similar to any other >>>>>>>>>> Dataflow job [1]. But some of the SDK options available to the main >>>>>>>>>> SDK >>>>>>>>>> environment are currently not available to external SDK >>>>>>>>>> environments. This >>>>>>>>>> includes changing the debug level. So I suggest adding INFO logs >>>>>>>>>> instead of >>>>>>>>>> changing the debug level if possible. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn >>>>>>>>>>> SDF that I should be wary of? I'm currently encountering a odd case >>>>>>>>>>> (in >>>>>>>>>>> Dataflow) where a Java pipeline runs with only one worker all the >>>>>>>>>>> way >>>>>>>>>>> reading Solace messages, but with an external transform in Python, >>>>>>>>>>> it >>>>>>>>>>> generates a large number of consumers and stop reading messages >>>>>>>>>>> altogether >>>>>>>>>>> about 90% of the way. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> +Boyuan Zhang <boyu...@google.com> might be able to help. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> Cheers >>>>>>>>>>> Alex >>>>>>>>>>> >>>>>>>>>>