Re: When to use Pipeline object

2021-07-11 Thread Alex Koay
>From my understanding, you need the Pipeline for mainly two things:
1. Marking the start of any processing flows (it serves as the PBegin
"PCollection") so any sources that follows it will run.
2. Running / executing / deploying the pipeline -- this happens
automatically with the context manager in your example, but otherwise you
can run pipeline.run() to get the same effect.

On Mon, Jul 12, 2021 at 10:04 AM  wrote:

> Hi,
>
>
> When using the python sdk I'm a little confused as to when the pipeline
> object is actually needed. I gather one needs it initially to create a
> pcollection, just because this is when I most often see it consistently
> used ex:
>
>
> with beam.Pipeline() as pipeline:
>
> dict_pc = (
>
> pipeline
>
> | beam.io.fileio.MatchFiles("./*.csv")
>
> | 'Read matched files' >> beam.io.fileio.ReadMatches()
>
> | 'Get CSV data as a dict' >> beam.FlatMap(my_csv_reader)
>
>)
>
>
>
># do stuff with dict_pc and other operations
>
>
> But beyond this when do one need the pipeline object?  It seems like the
> transforms expect a pcollection and output a pcollection so I'm confused
> and not finding documentation that addresses this.  thank you.
>
>
>


DataflowRunner with External Transform creates UnboundedSourceAsSDFWrapperFn repeatedly

2021-07-11 Thread Alex Koay
Can any Dataflow experts / Googlers enlighten me as to why this happens?
I shimmed the FnHarness for a Python pipeline with a Java external
transform, and it seems that the ProcessBundleHandler receives different
process-bundle-descriptor-%d for the same processor.
This leads to the system defeating the cachedReaders setup (and doesn't
evict previous cachedReaders, as it remains alive in those readers from
what I've seen).

(The story about the Solace source continues...)

Thanks
Alex


Re: Debugging External Transforms on Dataflow (Python)

2021-06-24 Thread Alex Koay
On the surface this looked ok, but running it on Dataflow ended up giving
me some new errors.
Essentially UnboundedSourceAsSDFRestrictionTracker.getProgress also uses
currentReader, but it creates new Readers (without checking the cache key).

@Boyuan Zhang  I'm guessing this should be part of this
PR (https://github.com/apache/beam/pull/13592), do you know if there was
something behind this?

Thanks!

Cheers
Alex

On Thu, Jun 24, 2021 at 4:01 PM Alex Koay  wrote:

> Good news for anybody who's following this.
> I finally had some time today to look into the problem again with some
> fresh eyes and I figured out the problems.
> As I suspected, the issue lies with the cachedReaders and how it uses
> CheckpointMarks.
>
> With the SDF cachedReaders, it serializes the Source and CheckpointMark
> (along with MIN_TIMESTAMP) to create a cache key for the reader so that it
> can be reused later. Sounds good so far.
>
> On the other hand, Solace doesn't make use of the CheckpointMark at all in
> Source.createReader(), opting instead to create a new Reader every time.
> This is because Solace doesn't really have a notion of checkpoints in
> their queue, you just get the next available message always.
> This makes sense to me.
> As a result, whenever createReader() is called, the CheckpointMark that
> comes as a result is always unique.
>
> The other point about Solace is that its Reader acknowledges the messages
> only upon finalization (which should be the case).
>
> So far, I'm re-iterating what I mentioned before. Well, here's the last
> part I has a hunch about but finally had time to confirm today.
>
> While Solace's cached readers do expire after a minute, while they're
> still running, the Solace server helpfully sends some messages (up to 255
> -- I could be wrong about this) to the reader first (which it then waits
> for an acknowledgement).
> Why this happens is because the FlowReceiver that Solace has isn't yet
> closed, the server treats the Reader as still being "active" for all
> intents and purposes.
> These messages, though, never get read at all by Beam, because
> Reader.advance() is never called, and as such it stays as such until the
> identical CheckpointMark is recalled (which it never does, because the
> CheckpointMark has moved on).
>
> In the meantime, the SDF spawns more and more readers over and over again
> (which will sooner or later go hungry) and all these will become cached
> (with some number of unacknowledged messages), because advance() was never
> called on it.
>
> Eventually, when cachedReaders hits 100 or after 60 seconds, the old
> Readers which have some unacknowledged 255 messages will then be closed,
> freeing up the messages to go to the other readers.
> But now here's the kicker, the other ~99 Readers (in each thread) in the
> cache also are active in the eyes of Solace!
> All of them will get some messages, which never get called on, because the
> CheckpointMark has moved on yet again.
> This goes on for eternity, leading to the very small trickle of messages
> coming in after that.
>
> I've currently fixed the problem by marking everything in Solace's
> CheckpointMark as transient, and as a result it will always reuse the
> cached Reader, but I'd like to discuss if there are any better ways to work
> around this.
> I would propose these two ideas as fallback options, especially
> considering existing UnboundedSources.
> 1. make Reader caching optional
> 2. simply always reuse the existing reader
>
> In any case, the problem (as it was) has been resolved.
>
> Cheers
> Alex
>
>
> On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik  wrote:
>
>> Yes I was referring to
>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>>  since
>> that is responsible for scheduling the bundle finalization. It will only be
>> invoked if the current bundle completes.
>>
>> I would add logging to ParDoEvaluator#finishBundle[1] to see that the
>> bundle is being completed.
>> I would add logging to EvaluationContext#handleResult[2] to see how the
>> bundle completion is being handled at the bundle finalization callback is
>> being invoked.
>>
>> 1:
>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267
>> 2:
>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157
>>
>> On Thu, Jun 17, 2021 at 10:42 

Re: Debugging External Transforms on Dataflow (Python)

2021-06-24 Thread Alex Koay
Good news for anybody who's following this.
I finally had some time today to look into the problem again with some
fresh eyes and I figured out the problems.
As I suspected, the issue lies with the cachedReaders and how it uses
CheckpointMarks.

With the SDF cachedReaders, it serializes the Source and CheckpointMark
(along with MIN_TIMESTAMP) to create a cache key for the reader so that it
can be reused later. Sounds good so far.

On the other hand, Solace doesn't make use of the CheckpointMark at all in
Source.createReader(), opting instead to create a new Reader every time.
This is because Solace doesn't really have a notion of checkpoints in their
queue, you just get the next available message always.
This makes sense to me.
As a result, whenever createReader() is called, the CheckpointMark that
comes as a result is always unique.

The other point about Solace is that its Reader acknowledges the messages
only upon finalization (which should be the case).

So far, I'm re-iterating what I mentioned before. Well, here's the last
part I has a hunch about but finally had time to confirm today.

While Solace's cached readers do expire after a minute, while they're still
running, the Solace server helpfully sends some messages (up to 255 -- I
could be wrong about this) to the reader first (which it then waits for an
acknowledgement).
Why this happens is because the FlowReceiver that Solace has isn't yet
closed, the server treats the Reader as still being "active" for all
intents and purposes.
These messages, though, never get read at all by Beam, because
Reader.advance() is never called, and as such it stays as such until the
identical CheckpointMark is recalled (which it never does, because the
CheckpointMark has moved on).

In the meantime, the SDF spawns more and more readers over and over again
(which will sooner or later go hungry) and all these will become cached
(with some number of unacknowledged messages), because advance() was never
called on it.

Eventually, when cachedReaders hits 100 or after 60 seconds, the old
Readers which have some unacknowledged 255 messages will then be closed,
freeing up the messages to go to the other readers.
But now here's the kicker, the other ~99 Readers (in each thread) in the
cache also are active in the eyes of Solace!
All of them will get some messages, which never get called on, because the
CheckpointMark has moved on yet again.
This goes on for eternity, leading to the very small trickle of messages
coming in after that.

I've currently fixed the problem by marking everything in Solace's
CheckpointMark as transient, and as a result it will always reuse the
cached Reader, but I'd like to discuss if there are any better ways to work
around this.
I would propose these two ideas as fallback options, especially considering
existing UnboundedSources.
1. make Reader caching optional
2. simply always reuse the existing reader

In any case, the problem (as it was) has been resolved.

Cheers
Alex


On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik  wrote:

> Yes I was referring to
> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>  since
> that is responsible for scheduling the bundle finalization. It will only be
> invoked if the current bundle completes.
>
> I would add logging to ParDoEvaluator#finishBundle[1] to see that the
> bundle is being completed.
> I would add logging to EvaluationContext#handleResult[2] to see how the
> bundle completion is being handled at the bundle finalization callback is
> being invoked.
>
> 1:
> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267
> 2:
> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157
>
> On Thu, Jun 17, 2021 at 10:42 AM Alex Koay  wrote:
>
>> Could you be referring to this part?
>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>>
>> I've tried fiddling with it and it gave some good results if I recall
>> correctly.
>> I didn't mention it earlier because I thought that maybe a shorter expiry
>> actually causes the readers to expire faster and thus releasing the unacked
>> messages for another reader to bundle up.
>>
>> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
>> called for at least some time if the bundle size is not maxed (or close to
>> maxed) out.
>> I've added logging into finalizeCheckpoint() and don't see it getting
>> called. It's 

Re: Debugging External Transforms on Dataflow (Python)

2021-06-17 Thread Alex Koay
Could you be referring to this part?
https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563

I've tried fiddling with it and it gave some good results if I recall
correctly.
I didn't mention it earlier because I thought that maybe a shorter expiry
actually causes the readers to expire faster and thus releasing the unacked
messages for another reader to bundle up.

I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
called for at least some time if the bundle size is not maxed (or close to
maxed) out.
I've added logging into finalizeCheckpoint() and don't see it getting
called. It's where the acknowledgements are happening.

I've actually opened a Google support ticket for this as well, perhaps you
could take a look at it (Case 28209335).
Thanks for your reply, I'll try to debug this further too.

On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik  wrote:

> Reducing the bundle size/timeout shouldn't be necessary since when the
> UnboundedSource returns false from advance(), the
> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
> return resume for the process continuation. This should cause
> invokeProcessElement() to complete in
> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific
> implementation should finish the current bundle. This will allow the runner
> to do two things:
> 1) Finalize the current bundle
> 2) Schedule the continuation for the checkpoint mark
>
> Based upon your description it looks like for some reason the runner is
> unable to complete the current bundle.
>
> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay  wrote:
>
>> Okay, I think I've found the issue, but now I need some help figuring out
>> how to fix the issue.
>>
>> 1. Solace allows a number of unacknowledged messages before it stops
>> sending more messages. This number just so happens to be 10k messages by
>> default (in the queue I am using).
>> 2. The Solace Beam transform (rightly) waits until bundle finalization
>> before acknowledging the messages.
>> 3. Bundle finalization doesn't happen until it either reaches 10k
>> messages or 10s for the DataflowRunner. For the PortableRunner this seems
>> to be 10k or an unknown timeout. This is related to the
>> OutputAndTimeBoundedSplittableProcessElementInvoker.
>> 4. Many readers are created (over and over) due to the
>> UnboundedSourceAsSdfWrapperFn.
>> 5. When a lot of readers are created, they would compete for the messages
>> (in non-exclusive mode), eventually leaving a small number of
>> unacknowledged messages per bundle.
>> 6. The readers are then cached in cachedReaders in the
>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
>> evicted after a minute. See https://github.com/apache/beam/pull/13592
>> 7. The readers each have a small number of unacknowledged messages which
>> will remain unacknowledged and cannot be given to another consumer until
>> the bundle finalization happens.
>> 8. When bundle finalization happens (possibly after the reader gets
>> evicted), the messages return to the queue, only to get taken by the huge
>> number of other competing readers.
>>
>> At this point, I'm guessing the few methods to fix this are:
>> a. reduce the bundle size / reduce the bundle timeout (which all seem to
>> be hardcoded per runner)
>> b. reduce the number of cached readers / their timeouts (which doesn't
>> seem to be customizable either) so that there would be less contention
>> c. somehow reduce the splitting process and instead reusing existing
>> sources over and over
>>
>> I'd be happy to send pull requests to help fix this issue but perhaps
>> will need some direction as to how I should fix this.
>>
>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay  wrote:
>>
>>> Alright, some updates.
>>>
>>> Using DirectRunner helped narrow things down quite a bit. It seems that
>>> the Solace transform is somewhat buggy when used with the
>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
>>> Refer to this:
>>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40
>>>
>>> The source simply creates a new Reader every time createReader() is
>>> called.
>>>
>>> Because of these, the cachedReaders in the
>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
>

Re: Debugging External Transforms on Dataflow (Python)

2021-06-17 Thread Alex Koay
Okay, I think I've found the issue, but now I need some help figuring out
how to fix the issue.

1. Solace allows a number of unacknowledged messages before it stops
sending more messages. This number just so happens to be 10k messages by
default (in the queue I am using).
2. The Solace Beam transform (rightly) waits until bundle finalization
before acknowledging the messages.
3. Bundle finalization doesn't happen until it either reaches 10k messages
or 10s for the DataflowRunner. For the PortableRunner this seems to be 10k
or an unknown timeout. This is related to the
OutputAndTimeBoundedSplittableProcessElementInvoker.
4. Many readers are created (over and over) due to the
UnboundedSourceAsSdfWrapperFn.
5. When a lot of readers are created, they would compete for the messages
(in non-exclusive mode), eventually leaving a small number of
unacknowledged messages per bundle.
6. The readers are then cached in cachedReaders in the
UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
evicted after a minute. See https://github.com/apache/beam/pull/13592
7. The readers each have a small number of unacknowledged messages which
will remain unacknowledged and cannot be given to another consumer until
the bundle finalization happens.
8. When bundle finalization happens (possibly after the reader gets
evicted), the messages return to the queue, only to get taken by the huge
number of other competing readers.

At this point, I'm guessing the few methods to fix this are:
a. reduce the bundle size / reduce the bundle timeout (which all seem to be
hardcoded per runner)
b. reduce the number of cached readers / their timeouts (which doesn't seem
to be customizable either) so that there would be less contention
c. somehow reduce the splitting process and instead reusing existing
sources over and over

I'd be happy to send pull requests to help fix this issue but perhaps will
need some direction as to how I should fix this.

On Wed, Jun 16, 2021 at 8:32 PM Alex Koay  wrote:

> Alright, some updates.
>
> Using DirectRunner helped narrow things down quite a bit. It seems that
> the Solace transform is somewhat buggy when used with the
> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
> Refer to this:
> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40
>
> The source simply creates a new Reader every time createReader() is called.
>
> Because of these, the cachedReaders in the
> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
> readers not being closed, but stay in the cache.
> Changing the timeout causes the pipeline to continue draining but at a
> glacial pace.
>
> I've still not able to isolate the root cause of why it suddenly stops
> reading more data (could be a Solace issue though).
>
>
> Also, trying the easy way out, I've tried running it with 2.24.0 (the last
> one without the SDF default Read) in Java and it works perfectly. Newer
> versions in Java DirectRunner don't work correctly either.
> Unfortunately Dataflow seems to expand the external transform using the
> SDF Read version even when using 2.24.0 (I'm not entirely sure why this is
> the case).
>
> I feel like I'm almost at the verge of fixing the problem, but at this
> point I'm still far from it.
>
>
> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay  wrote:
>
>> 1. I'm building a streaming pipeline.
>> 2. For the pure Java transforms pipeline I believe it got substituted
>> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I
>> think Java doesn't support that publicly yet). I used the default Java
>> flags with a DataflowRunner.
>> 3. I believe it's the source reader that is being created in mass.
>>
>> Currently I just tested the Python pipeline (with Java Solace transform)
>> on the DirectRunner without bounds, and it seems that the issue is
>> similarly manifesting. I'm trying to debug it this way for now.
>>
>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang  wrote:
>>
>>> In terms of the odd case you are experiencing,  it seems like you are
>>> comparing a pure java pipeline with a cross-language pipeline, right? I
>>> want to learn more details on this case:
>>>
>>>- Is this a batch pipeline or a streaming pipeline?
>>>- For your pure java transforms pipeline, do you run the pipeline
>>>with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'?
>>>    - For a large number of consumers, do you mean dataflow workers or
>>>the source reader?

Re: Debugging External Transforms on Dataflow (Python)

2021-06-16 Thread Alex Koay
Alright, some updates.

Using DirectRunner helped narrow things down quite a bit. It seems that the
Solace transform is somewhat buggy when used with the
UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
Refer to this:
https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40

The source simply creates a new Reader every time createReader() is called.

Because of these, the cachedReaders in the
UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
readers not being closed, but stay in the cache.
Changing the timeout causes the pipeline to continue draining but at a
glacial pace.

I've still not able to isolate the root cause of why it suddenly stops
reading more data (could be a Solace issue though).


Also, trying the easy way out, I've tried running it with 2.24.0 (the last
one without the SDF default Read) in Java and it works perfectly. Newer
versions in Java DirectRunner don't work correctly either.
Unfortunately Dataflow seems to expand the external transform using the SDF
Read version even when using 2.24.0 (I'm not entirely sure why this is the
case).

I feel like I'm almost at the verge of fixing the problem, but at this
point I'm still far from it.


On Wed, Jun 16, 2021 at 11:24 AM Alex Koay  wrote:

> 1. I'm building a streaming pipeline.
> 2. For the pure Java transforms pipeline I believe it got substituted with
> a Dataflow native Solace transform (it isn't using use_runner_v2 as I think
> Java doesn't support that publicly yet). I used the default Java flags with
> a DataflowRunner.
> 3. I believe it's the source reader that is being created in mass.
>
> Currently I just tested the Python pipeline (with Java Solace transform)
> on the DirectRunner without bounds, and it seems that the issue is
> similarly manifesting. I'm trying to debug it this way for now.
>
> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang  wrote:
>
>> In terms of the odd case you are experiencing,  it seems like you are
>> comparing a pure java pipeline with a cross-language pipeline, right? I
>> want to learn more details on this case:
>>
>>- Is this a batch pipeline or a streaming pipeline?
>>- For your pure java transforms pipeline, do you run the pipeline
>>with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'?
>>- For a large number of consumers, do you mean dataflow workers or
>>the source reader?
>>
>> If you can share the implementation of the source and the pipeline, that
>> would be really helpful.
>>
>> +Lukasz Cwik  for awareness.
>>
>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath 
>> wrote:
>>
>>>
>>>
>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay  wrote:
>>>
>>>> Several questions:
>>>>
>>>> 1. Is there any way to set the log level for the Java workers via a
>>>> Python Dataflow pipeline?
>>>>
>>>
>>>> 2. What is the easiest way to debug an external transform in Java? My
>>>> main pipeline code is in Python.
>>>>
>>>
>>> In general, debugging a job should be similar to any other Dataflow job
>>> [1]. But some of the SDK options available to the main SDK environment are
>>> currently not available to external SDK environments. This includes
>>> changing the debug level. So I suggest adding INFO logs instead of changing
>>> the debug level if possible.
>>>
>>> [1]
>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>
>>>
>>>>
>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF that
>>>> I should be wary of? I'm currently encountering a odd case (in Dataflow)
>>>> where a Java pipeline runs with only one worker all the way reading Solace
>>>> messages, but with an external transform in Python, it generates a large
>>>> number of consumers and stop reading messages altogether about 90% of the
>>>> way.
>>>>
>>>
>>> +Boyuan Zhang  might be able to help.
>>>
>>>
>>>> Thanks!
>>>>
>>>> Cheers
>>>> Alex
>>>>
>>>


Re: Debugging External Transforms on Dataflow (Python)

2021-06-15 Thread Alex Koay
1. I'm building a streaming pipeline.
2. For the pure Java transforms pipeline I believe it got substituted with
a Dataflow native Solace transform (it isn't using use_runner_v2 as I think
Java doesn't support that publicly yet). I used the default Java flags with
a DataflowRunner.
3. I believe it's the source reader that is being created in mass.

Currently I just tested the Python pipeline (with Java Solace transform) on
the DirectRunner without bounds, and it seems that the issue is similarly
manifesting. I'm trying to debug it this way for now.

On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang  wrote:

> In terms of the odd case you are experiencing,  it seems like you are
> comparing a pure java pipeline with a cross-language pipeline, right? I
> want to learn more details on this case:
>
>- Is this a batch pipeline or a streaming pipeline?
>- For your pure java transforms pipeline, do you run the pipeline with
>'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'?
>- For a large number of consumers, do you mean dataflow workers or the
>source reader?
>
> If you can share the implementation of the source and the pipeline, that
> would be really helpful.
>
> +Lukasz Cwik  for awareness.
>
> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath 
> wrote:
>
>>
>>
>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay  wrote:
>>
>>> Several questions:
>>>
>>> 1. Is there any way to set the log level for the Java workers via a
>>> Python Dataflow pipeline?
>>>
>>
>>> 2. What is the easiest way to debug an external transform in Java? My
>>> main pipeline code is in Python.
>>>
>>
>> In general, debugging a job should be similar to any other Dataflow job
>> [1]. But some of the SDK options available to the main SDK environment are
>> currently not available to external SDK environments. This includes
>> changing the debug level. So I suggest adding INFO logs instead of changing
>> the debug level if possible.
>>
>> [1]
>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>
>>
>>>
>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF that I
>>> should be wary of? I'm currently encountering a odd case (in Dataflow)
>>> where a Java pipeline runs with only one worker all the way reading Solace
>>> messages, but with an external transform in Python, it generates a large
>>> number of consumers and stop reading messages altogether about 90% of the
>>> way.
>>>
>>
>> +Boyuan Zhang  might be able to help.
>>
>>
>>> Thanks!
>>>
>>> Cheers
>>> Alex
>>>
>>


Debugging External Transforms on Dataflow (Python)

2021-06-15 Thread Alex Koay
Several questions:

1. Is there any way to set the log level for the Java workers via a Python
Dataflow pipeline?

2. What is the easiest way to debug an external transform in Java? My main
pipeline code is in Python.

3. Are there any edge cases with the UnboundedSourceWrapperFn SDF that I
should be wary of? I'm currently encountering a odd case (in Dataflow)
where a Java pipeline runs with only one worker all the way reading Solace
messages, but with an external transform in Python, it generates a large
number of consumers and stop reading messages altogether about 90% of the
way.

Thanks!

Cheers
Alex


Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
Finally figured out the issue.
Can confirm that the kafka_taxi job is working as expected now.
The issue was that I ran the Dataflow job with an invalid experiments flag
(runner_v2 instead of use_runner_v2), and I was getting logging messages
(on 2.29) that said that I was using Runner V2 even though it seems that I
wasn't.
Setting the correct flag fixes the issue (and so I get to see the correctly
expanded transforms in the graph).
Thanks for your help Cham!

Cheers
Alex

On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath 
wrote:

> Can you mention the Job Logs you see in the Dataflow Cloud Console page
> for your job ? Can you also mention the pipeline and configs you used for
> Dataflow (assuming it's different from what's given in the example) ?
> Make sure that you used Dataflow Runner v2 (as given in the example).
> Are you providing null keys by any chance ? There's a known issue related
> to that (but if you are just running the example, it should generate
> appropriate keys).
>
> Unfortunately for actually debugging your job, I need a Dataflow customer 
> support
> ticket <https://cloud.google.com/dataflow/docs/support>.
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay  wrote:
>
>> CC-ing Chamikara as he got omitted from the reply all I did earlier.
>>
>> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:
>>
>>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
>>> upon several threads saying so.
>>>
>>> On Dataflow, I've encountered a few different kinds of issues.
>>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
>>> Kafka would run, but nothing gets read from Kafka (this seems to get
>>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
>>> sub-transforms.
>>> 2. For the snippet I shared above, I would vary it either with a "log"
>>> transform or a direct "write" back to Kafka. Neither seems to work (and the
>>> steps don't get expanded unlike the kafka_taxi example). With the "write"
>>> step, I believe it didn't get captured in the Dataflow graph a few times.
>>> 3. No errors appear in both Job Logs and Worker Logs, except for one
>>> message emitted from the "log" step if that happens.
>>>
>>> All this is happening while I am producing ~4 messages/sec in Kafka. I
>>> can verify that Kafka is working normally remotely and all (ran into some
>>> issues setting it up).
>>> I've also tested the KafkaIO.read transform in Java and can confirm that
>>> it works as expected.
>>>
>>> As an aside, I put together an ExternalTransform for MqttIO which you
>>> can find here:
>>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
>>> I can confirm that it works in batch mode, but given that I couldn't get
>>> Kafka to work with Dataflow, I don't have much confidence in getting this
>>> to work.
>>>
>>> Thanks for your help.
>>>
>>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
>>> wrote:
>>>
>>>> What error did you run into with Dataflow ? Did you observe any errors
>>>> in worker logs ?
>>>> If you follow the steps given in the example here
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md>
>>>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>>>> servers you provide.
>>>>
>>>> Portable DirectRunner currently doesn't support streaming mode so you
>>>> need to convert your pipeline to a batch pipeline and provide
>>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>>>> source.
>>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>>>
>>>> Also portable runners (Flink, Spark etc.) have a known issue related to
>>>> SDF checkpointing in streaming mode which results in messages not being
>>>> pushed to subsequent steps. This is tracked in
>>>> https://issues.apache.org/jira/browse/BEAM-11998.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>>>
>>>>> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>>>>>  for multi language might be able to help.
>>>>>
>>>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>>>>>
>>>>&g

Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
CC-ing Chamikara as he got omitted from the reply all I did earlier.

On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:

> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
> upon several threads saying so.
>
> On Dataflow, I've encountered a few different kinds of issues.
> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
> Kafka would run, but nothing gets read from Kafka (this seems to get
> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
> sub-transforms.
> 2. For the snippet I shared above, I would vary it either with a "log"
> transform or a direct "write" back to Kafka. Neither seems to work (and the
> steps don't get expanded unlike the kafka_taxi example). With the "write"
> step, I believe it didn't get captured in the Dataflow graph a few times.
> 3. No errors appear in both Job Logs and Worker Logs, except for one
> message emitted from the "log" step if that happens.
>
> All this is happening while I am producing ~4 messages/sec in Kafka. I can
> verify that Kafka is working normally remotely and all (ran into some
> issues setting it up).
> I've also tested the KafkaIO.read transform in Java and can confirm that
> it works as expected.
>
> As an aside, I put together an ExternalTransform for MqttIO which you can
> find here:
> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
> I can confirm that it works in batch mode, but given that I couldn't get
> Kafka to work with Dataflow, I don't have much confidence in getting this
> to work.
>
> Thanks for your help.
>
> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
> wrote:
>
>> What error did you run into with Dataflow ? Did you observe any errors in
>> worker logs ?
>> If you follow the steps given in the example here
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md>
>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>> servers you provide.
>>
>> Portable DirectRunner currently doesn't support streaming mode so you
>> need to convert your pipeline to a batch pipeline and provide
>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>> source.
>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>
>> Also portable runners (Flink, Spark etc.) have a known issue related to
>> SDF checkpointing in streaming mode which results in messages not being
>> pushed to subsequent steps. This is tracked in
>> https://issues.apache.org/jira/browse/BEAM-11998.
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>
>>> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>>>  for multi language might be able to help.
>>>
>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have created a simple snippet as such:
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.io.kafka import ReadFromKafka
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>
>>>> import logging
>>>> logging.basicConfig(level=logging.WARNING)
>>>>
>>>> opts = direct_opts
>>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>>>> "--streaming"])) as p:
>>>> (
>>>> p
>>>> | "read" >> ReadFromKafka({"bootstrap.servers":
>>>> f"localhost:9092"}, topics=["topic"])
>>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>>> )
>>>>
>>>> I've set up a Kafka single node similar to the kafka_taxi README, and
>>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What
>>>> I mean by this is that the Transform seems to be capturing data, but
>>>> doesn't pass it on to subsequent transforms.
>>>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>>>> actually crashes (saying that it cannot encode null into a byte[]), hence
>>>> why I believe the transform is actually running.
>>>>
>>>> My main objective really is to create a streaming ExternalTransform for
>>>> MqttIO and SolaceIO (
>>>> https://github.com/SolaceProducts/solace-apache-beam).
>>>> I've implemented the builder and registrars and they work in batch mode
>>>> (with maxNumRecords) but otherwise it fails to read.
>>>>
>>>> With MqttIO, the streaming transform gets stuck waiting for one bundle
>>>> to complete (if I continuously send messages into the MQTT server), and
>>>> after stopping, the bundles finish but nothing gets passed on either.
>>>>
>>>> I appreciate any help I can get with this.
>>>> Thanks!
>>>>
>>>> Cheers
>>>> Alex
>>>>
>>>>
>>>>


Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
upon several threads saying so.

On Dataflow, I've encountered a few different kinds of issues.
1. For the kafka_taxi example, the pipeline would start, the PubSub to
Kafka would run, but nothing gets read from Kafka (this seems to get
expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
sub-transforms.
2. For the snippet I shared above, I would vary it either with a "log"
transform or a direct "write" back to Kafka. Neither seems to work (and the
steps don't get expanded unlike the kafka_taxi example). With the "write"
step, I believe it didn't get captured in the Dataflow graph a few times.
3. No errors appear in both Job Logs and Worker Logs, except for one
message emitted from the "log" step if that happens.

All this is happening while I am producing ~4 messages/sec in Kafka. I can
verify that Kafka is working normally remotely and all (ran into some
issues setting it up).
I've also tested the KafkaIO.read transform in Java and can confirm that it
works as expected.

As an aside, I put together an ExternalTransform for MqttIO which you can
find here: https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
I can confirm that it works in batch mode, but given that I couldn't get
Kafka to work with Dataflow, I don't have much confidence in getting this
to work.

Thanks for your help.

On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
wrote:

> What error did you run into with Dataflow ? Did you observe any errors in
> worker logs ?
> If you follow the steps given in the example here
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md>
> it should work. Make sure Dataflow workers have access to Kafka bootstrap
> servers you provide.
>
> Portable DirectRunner currently doesn't support streaming mode so you need
> to convert your pipeline to a batch pipeline and provide 'max_num_records'
> or 'max_read_time' to convert the Kafka source to a batch source.
> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>
> Also portable runners (Flink, Spark etc.) have a known issue related to
> SDF checkpointing in streaming mode which results in messages not being
> pushed to subsequent steps. This is tracked in
> https://issues.apache.org/jira/browse/BEAM-11998.
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>
>> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>>  for multi language might be able to help.
>>
>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>>
>>> Hi all,
>>>
>>> I have created a simple snippet as such:
>>>
>>> import apache_beam as beam
>>> from apache_beam.io.kafka import ReadFromKafka
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>
>>> import logging
>>> logging.basicConfig(level=logging.WARNING)
>>>
>>> opts = direct_opts
>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>>> "--streaming"])) as p:
>>> (
>>> p
>>> | "read" >> ReadFromKafka({"bootstrap.servers":
>>> f"localhost:9092"}, topics=["topic"])
>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>> )
>>>
>>> I've set up a Kafka single node similar to the kafka_taxi README, and
>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What
>>> I mean by this is that the Transform seems to be capturing data, but
>>> doesn't pass it on to subsequent transforms.
>>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>>> actually crashes (saying that it cannot encode null into a byte[]), hence
>>> why I believe the transform is actually running.
>>>
>>> My main objective really is to create a streaming ExternalTransform for
>>> MqttIO and SolaceIO (
>>> https://github.com/SolaceProducts/solace-apache-beam).
>>> I've implemented the builder and registrars and they work in batch mode
>>> (with maxNumRecords) but otherwise it fails to read.
>>>
>>> With MqttIO, the streaming transform gets stuck waiting for one bundle
>>> to complete (if I continuously send messages into the MQTT server), and
>>> after stopping, the bundles finish but nothing gets passed on either.
>>>
>>> I appreciate any help I can get with this.
>>> Thanks!
>>>
>>> Cheers
>>> Alex
>>>
>>>
>>>


Issues running Kafka streaming pipeline in Python

2021-06-01 Thread Alex Koay
Hi all,

I have created a simple snippet as such:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions

import logging
logging.basicConfig(level=logging.WARNING)

opts = direct_opts
with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
"--streaming"])) as p:
(
p
| "read" >> ReadFromKafka({"bootstrap.servers": f"localhost:9092"},
topics=["topic"])
| "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
)

I've set up a Kafka single node similar to the kafka_taxi README, and run
this both on DirectRunner and DataflowRunner but it doesn't work. What I
mean by this is that the Transform seems to be capturing data, but doesn't
pass it on to subsequent transforms.
With DirectRunner, if I send a non-keyed Kafka message to the server it
actually crashes (saying that it cannot encode null into a byte[]), hence
why I believe the transform is actually running.

My main objective really is to create a streaming ExternalTransform for
MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam).
I've implemented the builder and registrars and they work in batch mode
(with maxNumRecords) but otherwise it fails to read.

With MqttIO, the streaming transform gets stuck waiting for one bundle to
complete (if I continuously send messages into the MQTT server), and after
stopping, the bundles finish but nothing gets passed on either.

I appreciate any help I can get with this.
Thanks!

Cheers
Alex