I've been teaching myself Beam and here is an example pipeline that uses
Kafka IO (read and write). I hope it helps.

*Prerequisites*
1. Kafka runs on Docker and its external listener is exposed on port 29092
(i.e. its bootstrap server address can be specified as localhost:29092)
2. The following entry exists in /etc/hosts and the multi language pipeline
(Kafk IO) runs in a Docker and the container runs using the Docker host
network (i.e. --network host)

*127.0.0.1       host.docker.internal*

*Key notes*
1. Use FlinkRunner with or without specifying *flink_master*. If not
specified, an embedded Flink cluster is used. Otherwise add the
*flink_master* option (eg "flink_master": "localhost:8081"). (Check
--use_own argument)
2. The Kafka bootstrap server address should be specified as
*host.docker.internal:29092* (change port is different).
3. Add *use_deprecated_read* as indicated in an earlier reply. Otherwise it
may get stuck.

import os
import argparse
import json
import logging
import typing

import apache_beam as beam
from apache_beam.io import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def decode_message(kafka_kv: tuple):
    # Incoming Kafka records must have a key associated.
    # Otherwise, Beam throws an exception with null keys.
    #   Example: (b'key', b'value')
    return kafka_kv[1].decode("utf-8")


def create_message(element: dict):
    key = {"id": element["id"]}
    print(key)
    return json.dumps(key).encode("utf-8"), json.dumps(element).encode(
"utf-8")


def parse_json(element: str):
    return json.loads(element)


def run():
    parser = argparse.ArgumentParser(
        description="Kafka IO example"
    )
    parser.add_argument(
        "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
    )
    parser.add_argument(
        "--use_own",
        action="store_true",
        default="Flag to indicate whether to use an own local cluster",
    )
    opts = parser.parse_args()

    pipeline_opts = {
        "runner": opts.runner,
        "job_name": "kafka-io",
        "environment_type": "LOOPBACK",
        "streaming": True,
        "parallelism": 3,
        "experiments": [
            "use_deprecated_read"
        ],  ## https://github.com/apache/beam/issues/20979
        "checkpointing_interval": "60000",
    }
    if opts.use_own is True:
        pipeline_opts = {**pipeline_opts, **{"flink_master":
"localhost:8081"}}
    print(pipeline_opts)
    options = PipelineOptions([], **pipeline_opts)
    # Required, else it will complain that when importing worker functions
    options.view_as(SetupOptions).save_main_session = True

    p = beam.Pipeline(options=options)
    (
        p
        | "Read from Kafka"
        >> kafka.ReadFromKafka(
            consumer_config={
                "bootstrap.servers": os.getenv(
                    "BOOTSTRAP_SERVERS",
                    "host.docker.internal:29092",
                ),
                "auto.offset.reset": "earliest",
                # "enable.auto.commit": "true",
                "group.id": "kafka-io",
            },
            topics=["input-topic"],
        )
        | "Decode messages" >> beam.Map(decode_message)
        | "Parse elements" >> beam.Map(parse_json)
        | "Create messages"
        >> beam.Map(create_message).with_output_types(typing.Tuple[bytes,
bytes])
        | "Write to Kafka"
        >> kafka.WriteToKafka(
            producer_config={
                "bootstrap.servers": os.getenv(
                    "BOOTSTRAP_SERVERS",
                    "host.docker.internal:29092",
                )
            },
            topic="output-topic",
        )
    )

    logging.getLogger().setLevel(logging.WARN)
    logging.info("Building pipeline ...")

    p.run().wait_until_finish()


if __name__ == "__main__":
    run()


On Sat, 9 Mar 2024 at 08:12, Chamikara Jayalath via user <
user@beam.apache.org> wrote:

> Which runner are you using ?
>
> There's a known issue with SDFs not triggering for portable runners:
> https://github.com/apache/beam/issues/20979
>
> This should not occur for Dataflow.
> For Flink, you could use the option "--experiments=use_deprecated_read" to
> make it work.
>
> Thanks,
> Cham
>
> On Fri, Mar 8, 2024 at 8:10 AM LDesire <two_som...@icloud.com> wrote:
>
>> Hello Apache Beam community.
>> I'm asking because while creating a beam pipeline in Python,
>> ReadFromKafka is not working.
>>
>> My code looks like this
>>
>> ```
>> @beam.ptransform_fn
>> def LogElements(input):
>>     def log_element(elem):
>>         print(elem)
>>         return elem
>>
>>     return (
>>             input | 'DoLog' >> Map(log_element)
>>     )
>>
>>
>> if __name__ == '__main__':
>>     consumer_config = {
>>         'bootstrap.servers': '<bootstrap_server>'
>>     }
>>
>>     with beam.Pipeline(options=PipelineOptions(['--streaming'])) as p:
>>         (
>>                 p | ReadFromKafka(consumer_config=consumer_config,
>> topics=['input_topic'])
>>                 | "ToLines" >> beam.Map(lambda x: "%s %s" %
>> (x[0].decode("utf-8"), x[1].decode("utf-8")))
>>                 | 'Logging' >> LogElements()
>>         )
>> ```
>>
>> This is a simple pipeline that simply subscribes to a topic in Kafka and
>> outputs to the console.
>>
>> I've seen in the documentation that using ReadFromKafka in Python creates
>> an external java processor.
>> We also confirmed that it creates a process in VisualVM just fine.
>>
>> However, I'm not sure why I'm not getting messages from Kafka.
>>
>> * I checked the `_really_start_process` method of the `SubprocessServer`
>> class in subprocess_server.py to get the logs from the external java
>> process.
>>
>>
>>
>>

Reply via email to