/cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath
<chamik...@google.com> for multi language might be able to help.

On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote:

> Hi all,
>
> I have created a simple snippet as such:
>
> import apache_beam as beam
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
>
> import logging
> logging.basicConfig(level=logging.WARNING)
>
> opts = direct_opts
> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
> "--streaming"])) as p:
>     (
>         p
>         | "read" >> ReadFromKafka({"bootstrap.servers":
> f"localhost:9092"}, topics=["topic"])
>         | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>     )
>
> I've set up a Kafka single node similar to the kafka_taxi README, and run
> this both on DirectRunner and DataflowRunner but it doesn't work. What I
> mean by this is that the Transform seems to be capturing data, but doesn't
> pass it on to subsequent transforms.
> With DirectRunner, if I send a non-keyed Kafka message to the server it
> actually crashes (saying that it cannot encode null into a byte[]), hence
> why I believe the transform is actually running.
>
> My main objective really is to create a streaming ExternalTransform for
> MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam
> ).
> I've implemented the builder and registrars and they work in batch mode
> (with maxNumRecords) but otherwise it fails to read.
>
> With MqttIO, the streaming transform gets stuck waiting for one bundle to
> complete (if I continuously send messages into the MQTT server), and after
> stopping, the bundles finish but nothing gets passed on either.
>
> I appreciate any help I can get with this.
> Thanks!
>
> Cheers
> Alex
>
>
>

Reply via email to