Re: When to use Pipeline object
>From my understanding, you need the Pipeline for mainly two things: 1. Marking the start of any processing flows (it serves as the PBegin "PCollection") so any sources that follows it will run. 2. Running / executing / deploying the pipeline -- this happens automatically with the context manager in your example, but otherwise you can run pipeline.run() to get the same effect. On Mon, Jul 12, 2021 at 10:04 AM wrote: > Hi, > > > When using the python sdk I'm a little confused as to when the pipeline > object is actually needed. I gather one needs it initially to create a > pcollection, just because this is when I most often see it consistently > used ex: > > > with beam.Pipeline() as pipeline: > > dict_pc = ( > > pipeline > > | beam.io.fileio.MatchFiles("./*.csv") > > | 'Read matched files' >> beam.io.fileio.ReadMatches() > > | 'Get CSV data as a dict' >> beam.FlatMap(my_csv_reader) > >) > > > ># do stuff with dict_pc and other operations > > > But beyond this when do one need the pipeline object? It seems like the > transforms expect a pcollection and output a pcollection so I'm confused > and not finding documentation that addresses this. thank you. > > >
DataflowRunner with External Transform creates UnboundedSourceAsSDFWrapperFn repeatedly
Can any Dataflow experts / Googlers enlighten me as to why this happens? I shimmed the FnHarness for a Python pipeline with a Java external transform, and it seems that the ProcessBundleHandler receives different process-bundle-descriptor-%d for the same processor. This leads to the system defeating the cachedReaders setup (and doesn't evict previous cachedReaders, as it remains alive in those readers from what I've seen). (The story about the Solace source continues...) Thanks Alex
Re: Debugging External Transforms on Dataflow (Python)
On the surface this looked ok, but running it on Dataflow ended up giving me some new errors. Essentially UnboundedSourceAsSDFRestrictionTracker.getProgress also uses currentReader, but it creates new Readers (without checking the cache key). @Boyuan Zhang I'm guessing this should be part of this PR (https://github.com/apache/beam/pull/13592), do you know if there was something behind this? Thanks! Cheers Alex On Thu, Jun 24, 2021 at 4:01 PM Alex Koay wrote: > Good news for anybody who's following this. > I finally had some time today to look into the problem again with some > fresh eyes and I figured out the problems. > As I suspected, the issue lies with the cachedReaders and how it uses > CheckpointMarks. > > With the SDF cachedReaders, it serializes the Source and CheckpointMark > (along with MIN_TIMESTAMP) to create a cache key for the reader so that it > can be reused later. Sounds good so far. > > On the other hand, Solace doesn't make use of the CheckpointMark at all in > Source.createReader(), opting instead to create a new Reader every time. > This is because Solace doesn't really have a notion of checkpoints in > their queue, you just get the next available message always. > This makes sense to me. > As a result, whenever createReader() is called, the CheckpointMark that > comes as a result is always unique. > > The other point about Solace is that its Reader acknowledges the messages > only upon finalization (which should be the case). > > So far, I'm re-iterating what I mentioned before. Well, here's the last > part I has a hunch about but finally had time to confirm today. > > While Solace's cached readers do expire after a minute, while they're > still running, the Solace server helpfully sends some messages (up to 255 > -- I could be wrong about this) to the reader first (which it then waits > for an acknowledgement). > Why this happens is because the FlowReceiver that Solace has isn't yet > closed, the server treats the Reader as still being "active" for all > intents and purposes. > These messages, though, never get read at all by Beam, because > Reader.advance() is never called, and as such it stays as such until the > identical CheckpointMark is recalled (which it never does, because the > CheckpointMark has moved on). > > In the meantime, the SDF spawns more and more readers over and over again > (which will sooner or later go hungry) and all these will become cached > (with some number of unacknowledged messages), because advance() was never > called on it. > > Eventually, when cachedReaders hits 100 or after 60 seconds, the old > Readers which have some unacknowledged 255 messages will then be closed, > freeing up the messages to go to the other readers. > But now here's the kicker, the other ~99 Readers (in each thread) in the > cache also are active in the eyes of Solace! > All of them will get some messages, which never get called on, because the > CheckpointMark has moved on yet again. > This goes on for eternity, leading to the very small trickle of messages > coming in after that. > > I've currently fixed the problem by marking everything in Solace's > CheckpointMark as transient, and as a result it will always reuse the > cached Reader, but I'd like to discuss if there are any better ways to work > around this. > I would propose these two ideas as fallback options, especially > considering existing UnboundedSources. > 1. make Reader caching optional > 2. simply always reuse the existing reader > > In any case, the problem (as it was) has been resolved. > > Cheers > Alex > > > On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik wrote: > >> Yes I was referring to >> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 >> since >> that is responsible for scheduling the bundle finalization. It will only be >> invoked if the current bundle completes. >> >> I would add logging to ParDoEvaluator#finishBundle[1] to see that the >> bundle is being completed. >> I would add logging to EvaluationContext#handleResult[2] to see how the >> bundle completion is being handled at the bundle finalization callback is >> being invoked. >> >> 1: >> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267 >> 2: >> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157 >> >> On Thu, Jun 17, 2021 at 10:42
Re: Debugging External Transforms on Dataflow (Python)
Good news for anybody who's following this. I finally had some time today to look into the problem again with some fresh eyes and I figured out the problems. As I suspected, the issue lies with the cachedReaders and how it uses CheckpointMarks. With the SDF cachedReaders, it serializes the Source and CheckpointMark (along with MIN_TIMESTAMP) to create a cache key for the reader so that it can be reused later. Sounds good so far. On the other hand, Solace doesn't make use of the CheckpointMark at all in Source.createReader(), opting instead to create a new Reader every time. This is because Solace doesn't really have a notion of checkpoints in their queue, you just get the next available message always. This makes sense to me. As a result, whenever createReader() is called, the CheckpointMark that comes as a result is always unique. The other point about Solace is that its Reader acknowledges the messages only upon finalization (which should be the case). So far, I'm re-iterating what I mentioned before. Well, here's the last part I has a hunch about but finally had time to confirm today. While Solace's cached readers do expire after a minute, while they're still running, the Solace server helpfully sends some messages (up to 255 -- I could be wrong about this) to the reader first (which it then waits for an acknowledgement). Why this happens is because the FlowReceiver that Solace has isn't yet closed, the server treats the Reader as still being "active" for all intents and purposes. These messages, though, never get read at all by Beam, because Reader.advance() is never called, and as such it stays as such until the identical CheckpointMark is recalled (which it never does, because the CheckpointMark has moved on). In the meantime, the SDF spawns more and more readers over and over again (which will sooner or later go hungry) and all these will become cached (with some number of unacknowledged messages), because advance() was never called on it. Eventually, when cachedReaders hits 100 or after 60 seconds, the old Readers which have some unacknowledged 255 messages will then be closed, freeing up the messages to go to the other readers. But now here's the kicker, the other ~99 Readers (in each thread) in the cache also are active in the eyes of Solace! All of them will get some messages, which never get called on, because the CheckpointMark has moved on yet again. This goes on for eternity, leading to the very small trickle of messages coming in after that. I've currently fixed the problem by marking everything in Solace's CheckpointMark as transient, and as a result it will always reuse the cached Reader, but I'd like to discuss if there are any better ways to work around this. I would propose these two ideas as fallback options, especially considering existing UnboundedSources. 1. make Reader caching optional 2. simply always reuse the existing reader In any case, the problem (as it was) has been resolved. Cheers Alex On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik wrote: > Yes I was referring to > https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 > since > that is responsible for scheduling the bundle finalization. It will only be > invoked if the current bundle completes. > > I would add logging to ParDoEvaluator#finishBundle[1] to see that the > bundle is being completed. > I would add logging to EvaluationContext#handleResult[2] to see how the > bundle completion is being handled at the bundle finalization callback is > being invoked. > > 1: > https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267 > 2: > https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157 > > On Thu, Jun 17, 2021 at 10:42 AM Alex Koay wrote: > >> Could you be referring to this part? >> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 >> >> I've tried fiddling with it and it gave some good results if I recall >> correctly. >> I didn't mention it earlier because I thought that maybe a shorter expiry >> actually causes the readers to expire faster and thus releasing the unacked >> messages for another reader to bundle up. >> >> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get >> called for at least some time if the bundle size is not maxed (or close to >> maxed) out. >> I've added logging into finalizeCheckpoint() and don't see it getting >> called. It's
Re: Debugging External Transforms on Dataflow (Python)
Could you be referring to this part? https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 I've tried fiddling with it and it gave some good results if I recall correctly. I didn't mention it earlier because I thought that maybe a shorter expiry actually causes the readers to expire faster and thus releasing the unacked messages for another reader to bundle up. I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get called for at least some time if the bundle size is not maxed (or close to maxed) out. I've added logging into finalizeCheckpoint() and don't see it getting called. It's where the acknowledgements are happening. I've actually opened a Google support ticket for this as well, perhaps you could take a look at it (Case 28209335). Thanks for your reply, I'll try to debug this further too. On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik wrote: > Reducing the bundle size/timeout shouldn't be necessary since when the > UnboundedSource returns false from advance(), the > UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and > return resume for the process continuation. This should cause > invokeProcessElement() to complete in > OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific > implementation should finish the current bundle. This will allow the runner > to do two things: > 1) Finalize the current bundle > 2) Schedule the continuation for the checkpoint mark > > Based upon your description it looks like for some reason the runner is > unable to complete the current bundle. > > On Thu, Jun 17, 2021 at 2:48 AM Alex Koay wrote: > >> Okay, I think I've found the issue, but now I need some help figuring out >> how to fix the issue. >> >> 1. Solace allows a number of unacknowledged messages before it stops >> sending more messages. This number just so happens to be 10k messages by >> default (in the queue I am using). >> 2. The Solace Beam transform (rightly) waits until bundle finalization >> before acknowledging the messages. >> 3. Bundle finalization doesn't happen until it either reaches 10k >> messages or 10s for the DataflowRunner. For the PortableRunner this seems >> to be 10k or an unknown timeout. This is related to the >> OutputAndTimeBoundedSplittableProcessElementInvoker. >> 4. Many readers are created (over and over) due to the >> UnboundedSourceAsSdfWrapperFn. >> 5. When a lot of readers are created, they would compete for the messages >> (in non-exclusive mode), eventually leaving a small number of >> unacknowledged messages per bundle. >> 6. The readers are then cached in cachedReaders in the >> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get >> evicted after a minute. See https://github.com/apache/beam/pull/13592 >> 7. The readers each have a small number of unacknowledged messages which >> will remain unacknowledged and cannot be given to another consumer until >> the bundle finalization happens. >> 8. When bundle finalization happens (possibly after the reader gets >> evicted), the messages return to the queue, only to get taken by the huge >> number of other competing readers. >> >> At this point, I'm guessing the few methods to fix this are: >> a. reduce the bundle size / reduce the bundle timeout (which all seem to >> be hardcoded per runner) >> b. reduce the number of cached readers / their timeouts (which doesn't >> seem to be customizable either) so that there would be less contention >> c. somehow reduce the splitting process and instead reusing existing >> sources over and over >> >> I'd be happy to send pull requests to help fix this issue but perhaps >> will need some direction as to how I should fix this. >> >> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay wrote: >> >>> Alright, some updates. >>> >>> Using DirectRunner helped narrow things down quite a bit. It seems that >>> the Solace transform is somewhat buggy when used with the >>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark. >>> Refer to this: >>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40 >>> >>> The source simply creates a new Reader every time createReader() is >>> called. >>> >>> Because of these, the cachedReaders in the >>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in >
Re: Debugging External Transforms on Dataflow (Python)
Okay, I think I've found the issue, but now I need some help figuring out how to fix the issue. 1. Solace allows a number of unacknowledged messages before it stops sending more messages. This number just so happens to be 10k messages by default (in the queue I am using). 2. The Solace Beam transform (rightly) waits until bundle finalization before acknowledging the messages. 3. Bundle finalization doesn't happen until it either reaches 10k messages or 10s for the DataflowRunner. For the PortableRunner this seems to be 10k or an unknown timeout. This is related to the OutputAndTimeBoundedSplittableProcessElementInvoker. 4. Many readers are created (over and over) due to the UnboundedSourceAsSdfWrapperFn. 5. When a lot of readers are created, they would compete for the messages (in non-exclusive mode), eventually leaving a small number of unacknowledged messages per bundle. 6. The readers are then cached in cachedReaders in the UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get evicted after a minute. See https://github.com/apache/beam/pull/13592 7. The readers each have a small number of unacknowledged messages which will remain unacknowledged and cannot be given to another consumer until the bundle finalization happens. 8. When bundle finalization happens (possibly after the reader gets evicted), the messages return to the queue, only to get taken by the huge number of other competing readers. At this point, I'm guessing the few methods to fix this are: a. reduce the bundle size / reduce the bundle timeout (which all seem to be hardcoded per runner) b. reduce the number of cached readers / their timeouts (which doesn't seem to be customizable either) so that there would be less contention c. somehow reduce the splitting process and instead reusing existing sources over and over I'd be happy to send pull requests to help fix this issue but perhaps will need some direction as to how I should fix this. On Wed, Jun 16, 2021 at 8:32 PM Alex Koay wrote: > Alright, some updates. > > Using DirectRunner helped narrow things down quite a bit. It seems that > the Solace transform is somewhat buggy when used with the > UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark. > Refer to this: > https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40 > > The source simply creates a new Reader every time createReader() is called. > > Because of these, the cachedReaders in the > UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in > readers not being closed, but stay in the cache. > Changing the timeout causes the pipeline to continue draining but at a > glacial pace. > > I've still not able to isolate the root cause of why it suddenly stops > reading more data (could be a Solace issue though). > > > Also, trying the easy way out, I've tried running it with 2.24.0 (the last > one without the SDF default Read) in Java and it works perfectly. Newer > versions in Java DirectRunner don't work correctly either. > Unfortunately Dataflow seems to expand the external transform using the > SDF Read version even when using 2.24.0 (I'm not entirely sure why this is > the case). > > I feel like I'm almost at the verge of fixing the problem, but at this > point I'm still far from it. > > > On Wed, Jun 16, 2021 at 11:24 AM Alex Koay wrote: > >> 1. I'm building a streaming pipeline. >> 2. For the pure Java transforms pipeline I believe it got substituted >> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I >> think Java doesn't support that publicly yet). I used the default Java >> flags with a DataflowRunner. >> 3. I believe it's the source reader that is being created in mass. >> >> Currently I just tested the Python pipeline (with Java Solace transform) >> on the DirectRunner without bounds, and it seems that the issue is >> similarly manifesting. I'm trying to debug it this way for now. >> >> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang wrote: >> >>> In terms of the odd case you are experiencing, it seems like you are >>> comparing a pure java pipeline with a cross-language pipeline, right? I >>> want to learn more details on this case: >>> >>>- Is this a batch pipeline or a streaming pipeline? >>>- For your pure java transforms pipeline, do you run the pipeline >>>with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'? >>> - For a large number of consumers, do you mean dataflow workers or >>>the source reader?
Re: Debugging External Transforms on Dataflow (Python)
Alright, some updates. Using DirectRunner helped narrow things down quite a bit. It seems that the Solace transform is somewhat buggy when used with the UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark. Refer to this: https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40 The source simply creates a new Reader every time createReader() is called. Because of these, the cachedReaders in the UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in readers not being closed, but stay in the cache. Changing the timeout causes the pipeline to continue draining but at a glacial pace. I've still not able to isolate the root cause of why it suddenly stops reading more data (could be a Solace issue though). Also, trying the easy way out, I've tried running it with 2.24.0 (the last one without the SDF default Read) in Java and it works perfectly. Newer versions in Java DirectRunner don't work correctly either. Unfortunately Dataflow seems to expand the external transform using the SDF Read version even when using 2.24.0 (I'm not entirely sure why this is the case). I feel like I'm almost at the verge of fixing the problem, but at this point I'm still far from it. On Wed, Jun 16, 2021 at 11:24 AM Alex Koay wrote: > 1. I'm building a streaming pipeline. > 2. For the pure Java transforms pipeline I believe it got substituted with > a Dataflow native Solace transform (it isn't using use_runner_v2 as I think > Java doesn't support that publicly yet). I used the default Java flags with > a DataflowRunner. > 3. I believe it's the source reader that is being created in mass. > > Currently I just tested the Python pipeline (with Java Solace transform) > on the DirectRunner without bounds, and it seems that the issue is > similarly manifesting. I'm trying to debug it this way for now. > > On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang wrote: > >> In terms of the odd case you are experiencing, it seems like you are >> comparing a pure java pipeline with a cross-language pipeline, right? I >> want to learn more details on this case: >> >>- Is this a batch pipeline or a streaming pipeline? >>- For your pure java transforms pipeline, do you run the pipeline >>with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'? >>- For a large number of consumers, do you mean dataflow workers or >>the source reader? >> >> If you can share the implementation of the source and the pipeline, that >> would be really helpful. >> >> +Lukasz Cwik for awareness. >> >> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath >> wrote: >> >>> >>> >>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay wrote: >>> >>>> Several questions: >>>> >>>> 1. Is there any way to set the log level for the Java workers via a >>>> Python Dataflow pipeline? >>>> >>> >>>> 2. What is the easiest way to debug an external transform in Java? My >>>> main pipeline code is in Python. >>>> >>> >>> In general, debugging a job should be similar to any other Dataflow job >>> [1]. But some of the SDK options available to the main SDK environment are >>> currently not available to external SDK environments. This includes >>> changing the debug level. So I suggest adding INFO logs instead of changing >>> the debug level if possible. >>> >>> [1] >>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline >>> >>> >>>> >>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF that >>>> I should be wary of? I'm currently encountering a odd case (in Dataflow) >>>> where a Java pipeline runs with only one worker all the way reading Solace >>>> messages, but with an external transform in Python, it generates a large >>>> number of consumers and stop reading messages altogether about 90% of the >>>> way. >>>> >>> >>> +Boyuan Zhang might be able to help. >>> >>> >>>> Thanks! >>>> >>>> Cheers >>>> Alex >>>> >>>
Re: Debugging External Transforms on Dataflow (Python)
1. I'm building a streaming pipeline. 2. For the pure Java transforms pipeline I believe it got substituted with a Dataflow native Solace transform (it isn't using use_runner_v2 as I think Java doesn't support that publicly yet). I used the default Java flags with a DataflowRunner. 3. I believe it's the source reader that is being created in mass. Currently I just tested the Python pipeline (with Java Solace transform) on the DirectRunner without bounds, and it seems that the issue is similarly manifesting. I'm trying to debug it this way for now. On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang wrote: > In terms of the odd case you are experiencing, it seems like you are > comparing a pure java pipeline with a cross-language pipeline, right? I > want to learn more details on this case: > >- Is this a batch pipeline or a streaming pipeline? >- For your pure java transforms pipeline, do you run the pipeline with >'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'? >- For a large number of consumers, do you mean dataflow workers or the >source reader? > > If you can share the implementation of the source and the pipeline, that > would be really helpful. > > +Lukasz Cwik for awareness. > > On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath > wrote: > >> >> >> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay wrote: >> >>> Several questions: >>> >>> 1. Is there any way to set the log level for the Java workers via a >>> Python Dataflow pipeline? >>> >> >>> 2. What is the easiest way to debug an external transform in Java? My >>> main pipeline code is in Python. >>> >> >> In general, debugging a job should be similar to any other Dataflow job >> [1]. But some of the SDK options available to the main SDK environment are >> currently not available to external SDK environments. This includes >> changing the debug level. So I suggest adding INFO logs instead of changing >> the debug level if possible. >> >> [1] >> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline >> >> >>> >>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF that I >>> should be wary of? I'm currently encountering a odd case (in Dataflow) >>> where a Java pipeline runs with only one worker all the way reading Solace >>> messages, but with an external transform in Python, it generates a large >>> number of consumers and stop reading messages altogether about 90% of the >>> way. >>> >> >> +Boyuan Zhang might be able to help. >> >> >>> Thanks! >>> >>> Cheers >>> Alex >>> >>
Debugging External Transforms on Dataflow (Python)
Several questions: 1. Is there any way to set the log level for the Java workers via a Python Dataflow pipeline? 2. What is the easiest way to debug an external transform in Java? My main pipeline code is in Python. 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF that I should be wary of? I'm currently encountering a odd case (in Dataflow) where a Java pipeline runs with only one worker all the way reading Solace messages, but with an external transform in Python, it generates a large number of consumers and stop reading messages altogether about 90% of the way. Thanks! Cheers Alex
Re: Issues running Kafka streaming pipeline in Python
Finally figured out the issue. Can confirm that the kafka_taxi job is working as expected now. The issue was that I ran the Dataflow job with an invalid experiments flag (runner_v2 instead of use_runner_v2), and I was getting logging messages (on 2.29) that said that I was using Runner V2 even though it seems that I wasn't. Setting the correct flag fixes the issue (and so I get to see the correctly expanded transforms in the graph). Thanks for your help Cham! Cheers Alex On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath wrote: > Can you mention the Job Logs you see in the Dataflow Cloud Console page > for your job ? Can you also mention the pipeline and configs you used for > Dataflow (assuming it's different from what's given in the example) ? > Make sure that you used Dataflow Runner v2 (as given in the example). > Are you providing null keys by any chance ? There's a known issue related > to that (but if you are just running the example, it should generate > appropriate keys). > > Unfortunately for actually debugging your job, I need a Dataflow customer > support > ticket <https://cloud.google.com/dataflow/docs/support>. > > Thanks, > Cham > > On Wed, Jun 2, 2021 at 9:45 AM Alex Koay wrote: > >> CC-ing Chamikara as he got omitted from the reply all I did earlier. >> >> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay wrote: >> >>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled >>> upon several threads saying so. >>> >>> On Dataflow, I've encountered a few different kinds of issues. >>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to >>> Kafka would run, but nothing gets read from Kafka (this seems to get >>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata >>> sub-transforms. >>> 2. For the snippet I shared above, I would vary it either with a "log" >>> transform or a direct "write" back to Kafka. Neither seems to work (and the >>> steps don't get expanded unlike the kafka_taxi example). With the "write" >>> step, I believe it didn't get captured in the Dataflow graph a few times. >>> 3. No errors appear in both Job Logs and Worker Logs, except for one >>> message emitted from the "log" step if that happens. >>> >>> All this is happening while I am producing ~4 messages/sec in Kafka. I >>> can verify that Kafka is working normally remotely and all (ran into some >>> issues setting it up). >>> I've also tested the KafkaIO.read transform in Java and can confirm that >>> it works as expected. >>> >>> As an aside, I put together an ExternalTransform for MqttIO which you >>> can find here: >>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c >>> I can confirm that it works in batch mode, but given that I couldn't get >>> Kafka to work with Dataflow, I don't have much confidence in getting this >>> to work. >>> >>> Thanks for your help. >>> >>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath >>> wrote: >>> >>>> What error did you run into with Dataflow ? Did you observe any errors >>>> in worker logs ? >>>> If you follow the steps given in the example here >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md> >>>> it should work. Make sure Dataflow workers have access to Kafka bootstrap >>>> servers you provide. >>>> >>>> Portable DirectRunner currently doesn't support streaming mode so you >>>> need to convert your pipeline to a batch pipeline and provide >>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch >>>> source. >>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514. >>>> >>>> Also portable runners (Flink, Spark etc.) have a known issue related to >>>> SDF checkpointing in streaming mode which results in messages not being >>>> pushed to subsequent steps. This is tracked in >>>> https://issues.apache.org/jira/browse/BEAM-11998. >>>> >>>> Thanks, >>>> Cham >>>> >>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay wrote: >>>> >>>>> /cc @Boyuan Zhang for kafka @Chamikara Jayalath >>>>> for multi language might be able to help. >>>>> >>>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay wrote: >>>>> >>>>&g
Re: Issues running Kafka streaming pipeline in Python
CC-ing Chamikara as he got omitted from the reply all I did earlier. On Thu, Jun 3, 2021 at 12:43 AM Alex Koay wrote: > Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled > upon several threads saying so. > > On Dataflow, I've encountered a few different kinds of issues. > 1. For the kafka_taxi example, the pipeline would start, the PubSub to > Kafka would run, but nothing gets read from Kafka (this seems to get > expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata > sub-transforms. > 2. For the snippet I shared above, I would vary it either with a "log" > transform or a direct "write" back to Kafka. Neither seems to work (and the > steps don't get expanded unlike the kafka_taxi example). With the "write" > step, I believe it didn't get captured in the Dataflow graph a few times. > 3. No errors appear in both Job Logs and Worker Logs, except for one > message emitted from the "log" step if that happens. > > All this is happening while I am producing ~4 messages/sec in Kafka. I can > verify that Kafka is working normally remotely and all (ran into some > issues setting it up). > I've also tested the KafkaIO.read transform in Java and can confirm that > it works as expected. > > As an aside, I put together an ExternalTransform for MqttIO which you can > find here: > https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c > I can confirm that it works in batch mode, but given that I couldn't get > Kafka to work with Dataflow, I don't have much confidence in getting this > to work. > > Thanks for your help. > > On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath > wrote: > >> What error did you run into with Dataflow ? Did you observe any errors in >> worker logs ? >> If you follow the steps given in the example here >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md> >> it should work. Make sure Dataflow workers have access to Kafka bootstrap >> servers you provide. >> >> Portable DirectRunner currently doesn't support streaming mode so you >> need to convert your pipeline to a batch pipeline and provide >> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch >> source. >> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514. >> >> Also portable runners (Flink, Spark etc.) have a known issue related to >> SDF checkpointing in streaming mode which results in messages not being >> pushed to subsequent steps. This is tracked in >> https://issues.apache.org/jira/browse/BEAM-11998. >> >> Thanks, >> Cham >> >> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay wrote: >> >>> /cc @Boyuan Zhang for kafka @Chamikara Jayalath >>> for multi language might be able to help. >>> >>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay wrote: >>> >>>> Hi all, >>>> >>>> I have created a simple snippet as such: >>>> >>>> import apache_beam as beam >>>> from apache_beam.io.kafka import ReadFromKafka >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> >>>> import logging >>>> logging.basicConfig(level=logging.WARNING) >>>> >>>> opts = direct_opts >>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", >>>> "--streaming"])) as p: >>>> ( >>>> p >>>> | "read" >> ReadFromKafka({"bootstrap.servers": >>>> f"localhost:9092"}, topics=["topic"]) >>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) >>>> ) >>>> >>>> I've set up a Kafka single node similar to the kafka_taxi README, and >>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What >>>> I mean by this is that the Transform seems to be capturing data, but >>>> doesn't pass it on to subsequent transforms. >>>> With DirectRunner, if I send a non-keyed Kafka message to the server it >>>> actually crashes (saying that it cannot encode null into a byte[]), hence >>>> why I believe the transform is actually running. >>>> >>>> My main objective really is to create a streaming ExternalTransform for >>>> MqttIO and SolaceIO ( >>>> https://github.com/SolaceProducts/solace-apache-beam). >>>> I've implemented the builder and registrars and they work in batch mode >>>> (with maxNumRecords) but otherwise it fails to read. >>>> >>>> With MqttIO, the streaming transform gets stuck waiting for one bundle >>>> to complete (if I continuously send messages into the MQTT server), and >>>> after stopping, the bundles finish but nothing gets passed on either. >>>> >>>> I appreciate any help I can get with this. >>>> Thanks! >>>> >>>> Cheers >>>> Alex >>>> >>>> >>>>
Re: Issues running Kafka streaming pipeline in Python
Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled upon several threads saying so. On Dataflow, I've encountered a few different kinds of issues. 1. For the kafka_taxi example, the pipeline would start, the PubSub to Kafka would run, but nothing gets read from Kafka (this seems to get expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata sub-transforms. 2. For the snippet I shared above, I would vary it either with a "log" transform or a direct "write" back to Kafka. Neither seems to work (and the steps don't get expanded unlike the kafka_taxi example). With the "write" step, I believe it didn't get captured in the Dataflow graph a few times. 3. No errors appear in both Job Logs and Worker Logs, except for one message emitted from the "log" step if that happens. All this is happening while I am producing ~4 messages/sec in Kafka. I can verify that Kafka is working normally remotely and all (ran into some issues setting it up). I've also tested the KafkaIO.read transform in Java and can confirm that it works as expected. As an aside, I put together an ExternalTransform for MqttIO which you can find here: https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c I can confirm that it works in batch mode, but given that I couldn't get Kafka to work with Dataflow, I don't have much confidence in getting this to work. Thanks for your help. On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath wrote: > What error did you run into with Dataflow ? Did you observe any errors in > worker logs ? > If you follow the steps given in the example here > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md> > it should work. Make sure Dataflow workers have access to Kafka bootstrap > servers you provide. > > Portable DirectRunner currently doesn't support streaming mode so you need > to convert your pipeline to a batch pipeline and provide 'max_num_records' > or 'max_read_time' to convert the Kafka source to a batch source. > This is tracked in https://issues.apache.org/jira/browse/BEAM-7514. > > Also portable runners (Flink, Spark etc.) have a known issue related to > SDF checkpointing in streaming mode which results in messages not being > pushed to subsequent steps. This is tracked in > https://issues.apache.org/jira/browse/BEAM-11998. > > Thanks, > Cham > > On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay wrote: > >> /cc @Boyuan Zhang for kafka @Chamikara Jayalath >> for multi language might be able to help. >> >> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay wrote: >> >>> Hi all, >>> >>> I have created a simple snippet as such: >>> >>> import apache_beam as beam >>> from apache_beam.io.kafka import ReadFromKafka >>> from apache_beam.options.pipeline_options import PipelineOptions >>> >>> import logging >>> logging.basicConfig(level=logging.WARNING) >>> >>> opts = direct_opts >>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", >>> "--streaming"])) as p: >>> ( >>> p >>> | "read" >> ReadFromKafka({"bootstrap.servers": >>> f"localhost:9092"}, topics=["topic"]) >>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) >>> ) >>> >>> I've set up a Kafka single node similar to the kafka_taxi README, and >>> run this both on DirectRunner and DataflowRunner but it doesn't work. What >>> I mean by this is that the Transform seems to be capturing data, but >>> doesn't pass it on to subsequent transforms. >>> With DirectRunner, if I send a non-keyed Kafka message to the server it >>> actually crashes (saying that it cannot encode null into a byte[]), hence >>> why I believe the transform is actually running. >>> >>> My main objective really is to create a streaming ExternalTransform for >>> MqttIO and SolaceIO ( >>> https://github.com/SolaceProducts/solace-apache-beam). >>> I've implemented the builder and registrars and they work in batch mode >>> (with maxNumRecords) but otherwise it fails to read. >>> >>> With MqttIO, the streaming transform gets stuck waiting for one bundle >>> to complete (if I continuously send messages into the MQTT server), and >>> after stopping, the bundles finish but nothing gets passed on either. >>> >>> I appreciate any help I can get with this. >>> Thanks! >>> >>> Cheers >>> Alex >>> >>> >>>
Issues running Kafka streaming pipeline in Python
Hi all, I have created a simple snippet as such: import apache_beam as beam from apache_beam.io.kafka import ReadFromKafka from apache_beam.options.pipeline_options import PipelineOptions import logging logging.basicConfig(level=logging.WARNING) opts = direct_opts with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", "--streaming"])) as p: ( p | "read" >> ReadFromKafka({"bootstrap.servers": f"localhost:9092"}, topics=["topic"]) | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) ) I've set up a Kafka single node similar to the kafka_taxi README, and run this both on DirectRunner and DataflowRunner but it doesn't work. What I mean by this is that the Transform seems to be capturing data, but doesn't pass it on to subsequent transforms. With DirectRunner, if I send a non-keyed Kafka message to the server it actually crashes (saying that it cannot encode null into a byte[]), hence why I believe the transform is actually running. My main objective really is to create a streaming ExternalTransform for MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam). I've implemented the builder and registrars and they work in batch mode (with maxNumRecords) but otherwise it fails to read. With MqttIO, the streaming transform gets stuck waiting for one bundle to complete (if I continuously send messages into the MQTT server), and after stopping, the bundles finish but nothing gets passed on either. I appreciate any help I can get with this. Thanks! Cheers Alex