Re: Issues running Kafka streaming pipeline in Python

2021-06-04 Thread Robert Bradshaw
Glad you were able to figure it out.

Maybe it's moot with runner v2 becoming the default, but we really
should give a clearer error in this case.

On Wed, Jun 2, 2021 at 8:16 PM Chamikara Jayalath  wrote:
>
> Great :)
>
> On Wed, Jun 2, 2021 at 8:15 PM Alex Koay  wrote:
>>
>> Finally figured out the issue.
>> Can confirm that the kafka_taxi job is working as expected now.
>> The issue was that I ran the Dataflow job with an invalid experiments flag 
>> (runner_v2 instead of use_runner_v2), and I was getting logging messages (on 
>> 2.29) that said that I was using Runner V2 even though it seems that I 
>> wasn't.
>> Setting the correct flag fixes the issue (and so I get to see the correctly 
>> expanded transforms in the graph).
>> Thanks for your help Cham!
>>
>> Cheers
>> Alex
>>
>> On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath  
>> wrote:
>>>
>>> Can you mention the Job Logs you see in the Dataflow Cloud Console page for 
>>> your job ? Can you also mention the pipeline and configs you used for 
>>> Dataflow (assuming it's different from what's given in the example) ?
>>> Make sure that you used Dataflow Runner v2 (as given in the example).
>>> Are you providing null keys by any chance ? There's a known issue related 
>>> to that (but if you are just running the example, it should generate 
>>> appropriate keys).
>>>
>>> Unfortunately for actually debugging your job, I need a Dataflow customer 
>>> support ticket.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay  wrote:

 CC-ing Chamikara as he got omitted from the reply all I did earlier.

 On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:
>
> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled 
> upon several threads saying so.
>
> On Dataflow, I've encountered a few different kinds of issues.
> 1. For the kafka_taxi example, the pipeline would start, the PubSub to 
> Kafka would run, but nothing gets read from Kafka (this seems to get 
> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata 
> sub-transforms.
> 2. For the snippet I shared above, I would vary it either with a "log" 
> transform or a direct "write" back to Kafka. Neither seems to work (and 
> the steps don't get expanded unlike the kafka_taxi example). With the 
> "write" step, I believe it didn't get captured in the Dataflow graph a 
> few times.
> 3. No errors appear in both Job Logs and Worker Logs, except for one 
> message emitted from the "log" step if that happens.
>
> All this is happening while I am producing ~4 messages/sec in Kafka. I 
> can verify that Kafka is working normally remotely and all (ran into some 
> issues setting it up).
> I've also tested the KafkaIO.read transform in Java and can confirm that 
> it works as expected.
>
> As an aside, I put together an ExternalTransform for MqttIO which you can 
> find here: 
> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
> I can confirm that it works in batch mode, but given that I couldn't get 
> Kafka to work with Dataflow, I don't have much confidence in getting this 
> to work.
>
> Thanks for your help.
>
> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath  
> wrote:
>>
>> What error did you run into with Dataflow ? Did you observe any errors 
>> in worker logs ?
>> If you follow the steps given in the example here it should work. Make 
>> sure Dataflow workers have access to Kafka bootstrap servers you provide.
>>
>> Portable DirectRunner currently doesn't support streaming mode so you 
>> need to convert your pipeline to a batch pipeline and provide 
>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a 
>> batch source.
>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>
>> Also portable runners (Flink, Spark etc.) have a known issue related to 
>> SDF checkpointing in streaming mode which results in messages not being 
>> pushed to subsequent steps. This is tracked in 
>> https://issues.apache.org/jira/browse/BEAM-11998.
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>>
>>> /cc @Boyuan Zhang for kafka @Chamikara Jayalath for multi language 
>>> might be able to help.
>>>
>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:

 Hi all,

 I have created a simple snippet as such:

 import apache_beam as beam
 from apache_beam.io.kafka import ReadFromKafka
 from apache_beam.options.pipeline_options import PipelineOptions

 import logging
 logging.basicConfig(level=logging.WARNING)

 opts = direct_opts
 with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", 
 "--streaming"])) as p:

Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
Finally figured out the issue.
Can confirm that the kafka_taxi job is working as expected now.
The issue was that I ran the Dataflow job with an invalid experiments flag
(runner_v2 instead of use_runner_v2), and I was getting logging messages
(on 2.29) that said that I was using Runner V2 even though it seems that I
wasn't.
Setting the correct flag fixes the issue (and so I get to see the correctly
expanded transforms in the graph).
Thanks for your help Cham!

Cheers
Alex

On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath 
wrote:

> Can you mention the Job Logs you see in the Dataflow Cloud Console page
> for your job ? Can you also mention the pipeline and configs you used for
> Dataflow (assuming it's different from what's given in the example) ?
> Make sure that you used Dataflow Runner v2 (as given in the example).
> Are you providing null keys by any chance ? There's a known issue related
> to that (but if you are just running the example, it should generate
> appropriate keys).
>
> Unfortunately for actually debugging your job, I need a Dataflow customer 
> support
> ticket .
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay  wrote:
>
>> CC-ing Chamikara as he got omitted from the reply all I did earlier.
>>
>> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:
>>
>>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
>>> upon several threads saying so.
>>>
>>> On Dataflow, I've encountered a few different kinds of issues.
>>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
>>> Kafka would run, but nothing gets read from Kafka (this seems to get
>>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
>>> sub-transforms.
>>> 2. For the snippet I shared above, I would vary it either with a "log"
>>> transform or a direct "write" back to Kafka. Neither seems to work (and the
>>> steps don't get expanded unlike the kafka_taxi example). With the "write"
>>> step, I believe it didn't get captured in the Dataflow graph a few times.
>>> 3. No errors appear in both Job Logs and Worker Logs, except for one
>>> message emitted from the "log" step if that happens.
>>>
>>> All this is happening while I am producing ~4 messages/sec in Kafka. I
>>> can verify that Kafka is working normally remotely and all (ran into some
>>> issues setting it up).
>>> I've also tested the KafkaIO.read transform in Java and can confirm that
>>> it works as expected.
>>>
>>> As an aside, I put together an ExternalTransform for MqttIO which you
>>> can find here:
>>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
>>> I can confirm that it works in batch mode, but given that I couldn't get
>>> Kafka to work with Dataflow, I don't have much confidence in getting this
>>> to work.
>>>
>>> Thanks for your help.
>>>
>>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
>>> wrote:
>>>
 What error did you run into with Dataflow ? Did you observe any errors
 in worker logs ?
 If you follow the steps given in the example here
 
 it should work. Make sure Dataflow workers have access to Kafka bootstrap
 servers you provide.

 Portable DirectRunner currently doesn't support streaming mode so you
 need to convert your pipeline to a batch pipeline and provide
 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
 source.
 This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.

 Also portable runners (Flink, Spark etc.) have a known issue related to
 SDF checkpointing in streaming mode which results in messages not being
 pushed to subsequent steps. This is tracked in
 https://issues.apache.org/jira/browse/BEAM-11998.

 Thanks,
 Cham

 On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:

> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>  for multi language might be able to help.
>
> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>
>> Hi all,
>>
>> I have created a simple snippet as such:
>>
>> import apache_beam as beam
>> from apache_beam.io.kafka import ReadFromKafka
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>> import logging
>> logging.basicConfig(level=logging.WARNING)
>>
>> opts = direct_opts
>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>> "--streaming"])) as p:
>> (
>> p
>> | "read" >> ReadFromKafka({"bootstrap.servers":
>> f"localhost:9092"}, topics=["topic"])
>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>> )
>>
>> I've set up a Kafka single node similar to the kafka_taxi README, and
>> run this both on DirectRunner and DataflowRunner but it doesn't work. 
>> What
>> I mean by 

Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Chamikara Jayalath
Can you mention the Job Logs you see in the Dataflow Cloud Console page for
your job ? Can you also mention the pipeline and configs you used for
Dataflow (assuming it's different from what's given in the example) ?
Make sure that you used Dataflow Runner v2 (as given in the example).
Are you providing null keys by any chance ? There's a known issue related
to that (but if you are just running the example, it should generate
appropriate keys).

Unfortunately for actually debugging your job, I need a Dataflow
customer support
ticket .

Thanks,
Cham

On Wed, Jun 2, 2021 at 9:45 AM Alex Koay  wrote:

> CC-ing Chamikara as he got omitted from the reply all I did earlier.
>
> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:
>
>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
>> upon several threads saying so.
>>
>> On Dataflow, I've encountered a few different kinds of issues.
>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
>> Kafka would run, but nothing gets read from Kafka (this seems to get
>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
>> sub-transforms.
>> 2. For the snippet I shared above, I would vary it either with a "log"
>> transform or a direct "write" back to Kafka. Neither seems to work (and the
>> steps don't get expanded unlike the kafka_taxi example). With the "write"
>> step, I believe it didn't get captured in the Dataflow graph a few times.
>> 3. No errors appear in both Job Logs and Worker Logs, except for one
>> message emitted from the "log" step if that happens.
>>
>> All this is happening while I am producing ~4 messages/sec in Kafka. I
>> can verify that Kafka is working normally remotely and all (ran into some
>> issues setting it up).
>> I've also tested the KafkaIO.read transform in Java and can confirm that
>> it works as expected.
>>
>> As an aside, I put together an ExternalTransform for MqttIO which you can
>> find here:
>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
>> I can confirm that it works in batch mode, but given that I couldn't get
>> Kafka to work with Dataflow, I don't have much confidence in getting this
>> to work.
>>
>> Thanks for your help.
>>
>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
>> wrote:
>>
>>> What error did you run into with Dataflow ? Did you observe any errors
>>> in worker logs ?
>>> If you follow the steps given in the example here
>>> 
>>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>>> servers you provide.
>>>
>>> Portable DirectRunner currently doesn't support streaming mode so you
>>> need to convert your pipeline to a batch pipeline and provide
>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>>> source.
>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>>
>>> Also portable runners (Flink, Spark etc.) have a known issue related to
>>> SDF checkpointing in streaming mode which results in messages not being
>>> pushed to subsequent steps. This is tracked in
>>> https://issues.apache.org/jira/browse/BEAM-11998.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>>
 /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
  for multi language might be able to help.

 On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:

> Hi all,
>
> I have created a simple snippet as such:
>
> import apache_beam as beam
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
>
> import logging
> logging.basicConfig(level=logging.WARNING)
>
> opts = direct_opts
> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
> "--streaming"])) as p:
> (
> p
> | "read" >> ReadFromKafka({"bootstrap.servers":
> f"localhost:9092"}, topics=["topic"])
> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
> )
>
> I've set up a Kafka single node similar to the kafka_taxi README, and
> run this both on DirectRunner and DataflowRunner but it doesn't work. What
> I mean by this is that the Transform seems to be capturing data, but
> doesn't pass it on to subsequent transforms.
> With DirectRunner, if I send a non-keyed Kafka message to the server
> it actually crashes (saying that it cannot encode null into a byte[]),
> hence why I believe the transform is actually running.
>
> My main objective really is to create a streaming ExternalTransform
> for MqttIO and SolaceIO (
> https://github.com/SolaceProducts/solace-apache-beam).
> I've implemented the builder and registrars and they work in batch
> mode (with maxNumRecords) but otherwise it fails to read.
>
> With MqttIO, the 

Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
CC-ing Chamikara as he got omitted from the reply all I did earlier.

On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:

> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
> upon several threads saying so.
>
> On Dataflow, I've encountered a few different kinds of issues.
> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
> Kafka would run, but nothing gets read from Kafka (this seems to get
> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
> sub-transforms.
> 2. For the snippet I shared above, I would vary it either with a "log"
> transform or a direct "write" back to Kafka. Neither seems to work (and the
> steps don't get expanded unlike the kafka_taxi example). With the "write"
> step, I believe it didn't get captured in the Dataflow graph a few times.
> 3. No errors appear in both Job Logs and Worker Logs, except for one
> message emitted from the "log" step if that happens.
>
> All this is happening while I am producing ~4 messages/sec in Kafka. I can
> verify that Kafka is working normally remotely and all (ran into some
> issues setting it up).
> I've also tested the KafkaIO.read transform in Java and can confirm that
> it works as expected.
>
> As an aside, I put together an ExternalTransform for MqttIO which you can
> find here:
> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
> I can confirm that it works in batch mode, but given that I couldn't get
> Kafka to work with Dataflow, I don't have much confidence in getting this
> to work.
>
> Thanks for your help.
>
> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
> wrote:
>
>> What error did you run into with Dataflow ? Did you observe any errors in
>> worker logs ?
>> If you follow the steps given in the example here
>> 
>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>> servers you provide.
>>
>> Portable DirectRunner currently doesn't support streaming mode so you
>> need to convert your pipeline to a batch pipeline and provide
>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>> source.
>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>
>> Also portable runners (Flink, Spark etc.) have a known issue related to
>> SDF checkpointing in streaming mode which results in messages not being
>> pushed to subsequent steps. This is tracked in
>> https://issues.apache.org/jira/browse/BEAM-11998.
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>
>>> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>>>  for multi language might be able to help.
>>>
>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>>>
 Hi all,

 I have created a simple snippet as such:

 import apache_beam as beam
 from apache_beam.io.kafka import ReadFromKafka
 from apache_beam.options.pipeline_options import PipelineOptions

 import logging
 logging.basicConfig(level=logging.WARNING)

 opts = direct_opts
 with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
 "--streaming"])) as p:
 (
 p
 | "read" >> ReadFromKafka({"bootstrap.servers":
 f"localhost:9092"}, topics=["topic"])
 | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
 )

 I've set up a Kafka single node similar to the kafka_taxi README, and
 run this both on DirectRunner and DataflowRunner but it doesn't work. What
 I mean by this is that the Transform seems to be capturing data, but
 doesn't pass it on to subsequent transforms.
 With DirectRunner, if I send a non-keyed Kafka message to the server it
 actually crashes (saying that it cannot encode null into a byte[]), hence
 why I believe the transform is actually running.

 My main objective really is to create a streaming ExternalTransform for
 MqttIO and SolaceIO (
 https://github.com/SolaceProducts/solace-apache-beam).
 I've implemented the builder and registrars and they work in batch mode
 (with maxNumRecords) but otherwise it fails to read.

 With MqttIO, the streaming transform gets stuck waiting for one bundle
 to complete (if I continuously send messages into the MQTT server), and
 after stopping, the bundles finish but nothing gets passed on either.

 I appreciate any help I can get with this.
 Thanks!

 Cheers
 Alex





Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
upon several threads saying so.

On Dataflow, I've encountered a few different kinds of issues.
1. For the kafka_taxi example, the pipeline would start, the PubSub to
Kafka would run, but nothing gets read from Kafka (this seems to get
expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
sub-transforms.
2. For the snippet I shared above, I would vary it either with a "log"
transform or a direct "write" back to Kafka. Neither seems to work (and the
steps don't get expanded unlike the kafka_taxi example). With the "write"
step, I believe it didn't get captured in the Dataflow graph a few times.
3. No errors appear in both Job Logs and Worker Logs, except for one
message emitted from the "log" step if that happens.

All this is happening while I am producing ~4 messages/sec in Kafka. I can
verify that Kafka is working normally remotely and all (ran into some
issues setting it up).
I've also tested the KafkaIO.read transform in Java and can confirm that it
works as expected.

As an aside, I put together an ExternalTransform for MqttIO which you can
find here: https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
I can confirm that it works in batch mode, but given that I couldn't get
Kafka to work with Dataflow, I don't have much confidence in getting this
to work.

Thanks for your help.

On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
wrote:

> What error did you run into with Dataflow ? Did you observe any errors in
> worker logs ?
> If you follow the steps given in the example here
> 
> it should work. Make sure Dataflow workers have access to Kafka bootstrap
> servers you provide.
>
> Portable DirectRunner currently doesn't support streaming mode so you need
> to convert your pipeline to a batch pipeline and provide 'max_num_records'
> or 'max_read_time' to convert the Kafka source to a batch source.
> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>
> Also portable runners (Flink, Spark etc.) have a known issue related to
> SDF checkpointing in streaming mode which results in messages not being
> pushed to subsequent steps. This is tracked in
> https://issues.apache.org/jira/browse/BEAM-11998.
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>
>> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>>  for multi language might be able to help.
>>
>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>>
>>> Hi all,
>>>
>>> I have created a simple snippet as such:
>>>
>>> import apache_beam as beam
>>> from apache_beam.io.kafka import ReadFromKafka
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>
>>> import logging
>>> logging.basicConfig(level=logging.WARNING)
>>>
>>> opts = direct_opts
>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>>> "--streaming"])) as p:
>>> (
>>> p
>>> | "read" >> ReadFromKafka({"bootstrap.servers":
>>> f"localhost:9092"}, topics=["topic"])
>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>> )
>>>
>>> I've set up a Kafka single node similar to the kafka_taxi README, and
>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What
>>> I mean by this is that the Transform seems to be capturing data, but
>>> doesn't pass it on to subsequent transforms.
>>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>>> actually crashes (saying that it cannot encode null into a byte[]), hence
>>> why I believe the transform is actually running.
>>>
>>> My main objective really is to create a streaming ExternalTransform for
>>> MqttIO and SolaceIO (
>>> https://github.com/SolaceProducts/solace-apache-beam).
>>> I've implemented the builder and registrars and they work in batch mode
>>> (with maxNumRecords) but otherwise it fails to read.
>>>
>>> With MqttIO, the streaming transform gets stuck waiting for one bundle
>>> to complete (if I continuously send messages into the MQTT server), and
>>> after stopping, the bundles finish but nothing gets passed on either.
>>>
>>> I appreciate any help I can get with this.
>>> Thanks!
>>>
>>> Cheers
>>> Alex
>>>
>>>
>>>


Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Chamikara Jayalath
What error did you run into with Dataflow ? Did you observe any errors in
worker logs ?
If you follow the steps given in the example here

it should work. Make sure Dataflow workers have access to Kafka bootstrap
servers you provide.

Portable DirectRunner currently doesn't support streaming mode so you need
to convert your pipeline to a batch pipeline and provide 'max_num_records'
or 'max_read_time' to convert the Kafka source to a batch source.
This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.

Also portable runners (Flink, Spark etc.) have a known issue related to SDF
checkpointing in streaming mode which results in messages not being pushed
to subsequent steps. This is tracked in
https://issues.apache.org/jira/browse/BEAM-11998.

Thanks,
Cham

On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:

> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>  for multi language might be able to help.
>
> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>
>> Hi all,
>>
>> I have created a simple snippet as such:
>>
>> import apache_beam as beam
>> from apache_beam.io.kafka import ReadFromKafka
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>> import logging
>> logging.basicConfig(level=logging.WARNING)
>>
>> opts = direct_opts
>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>> "--streaming"])) as p:
>> (
>> p
>> | "read" >> ReadFromKafka({"bootstrap.servers":
>> f"localhost:9092"}, topics=["topic"])
>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>> )
>>
>> I've set up a Kafka single node similar to the kafka_taxi README, and run
>> this both on DirectRunner and DataflowRunner but it doesn't work. What I
>> mean by this is that the Transform seems to be capturing data, but doesn't
>> pass it on to subsequent transforms.
>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>> actually crashes (saying that it cannot encode null into a byte[]), hence
>> why I believe the transform is actually running.
>>
>> My main objective really is to create a streaming ExternalTransform for
>> MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam
>> ).
>> I've implemented the builder and registrars and they work in batch mode
>> (with maxNumRecords) but otherwise it fails to read.
>>
>> With MqttIO, the streaming transform gets stuck waiting for one bundle to
>> complete (if I continuously send messages into the MQTT server), and after
>> stopping, the bundles finish but nothing gets passed on either.
>>
>> I appreciate any help I can get with this.
>> Thanks!
>>
>> Cheers
>> Alex
>>
>>
>>


Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Ahmet Altay
/cc @Boyuan Zhang  for kafka @Chamikara Jayalath
 for multi language might be able to help.

On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:

> Hi all,
>
> I have created a simple snippet as such:
>
> import apache_beam as beam
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
>
> import logging
> logging.basicConfig(level=logging.WARNING)
>
> opts = direct_opts
> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
> "--streaming"])) as p:
> (
> p
> | "read" >> ReadFromKafka({"bootstrap.servers":
> f"localhost:9092"}, topics=["topic"])
> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
> )
>
> I've set up a Kafka single node similar to the kafka_taxi README, and run
> this both on DirectRunner and DataflowRunner but it doesn't work. What I
> mean by this is that the Transform seems to be capturing data, but doesn't
> pass it on to subsequent transforms.
> With DirectRunner, if I send a non-keyed Kafka message to the server it
> actually crashes (saying that it cannot encode null into a byte[]), hence
> why I believe the transform is actually running.
>
> My main objective really is to create a streaming ExternalTransform for
> MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam
> ).
> I've implemented the builder and registrars and they work in batch mode
> (with maxNumRecords) but otherwise it fails to read.
>
> With MqttIO, the streaming transform gets stuck waiting for one bundle to
> complete (if I continuously send messages into the MQTT server), and after
> stopping, the bundles finish but nothing gets passed on either.
>
> I appreciate any help I can get with this.
> Thanks!
>
> Cheers
> Alex
>
>
>