I am unable to read from Kafka and getting the following warnings & errors
when calling kafka.ReadFromKafka() (Python SDK):

WARNING:root:severity: WARN
timestamp {
  seconds: 1591370012
  nanos: 523000000
}
message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
could not be established. Broker may not be available."
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "18"

Finally the pipeline fails with:

RuntimeError: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata

See more complete log attached.

The relevant code snippet:

consumer_conf = {"bootstrap.servers": 'localhost:9092'}
...
kafka.ReadFromKafka(
                consumer_config=consumer_conf,
                topics=[args.topic],
)
...

Also see full python script attached.

I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
also not able to read from topic.

I am using kafka 2.5.0 and started the broker by following
https://kafka.apache.org/quickstart - using default
config/server.properties.

Everything runs locally, and I verified that I can publish&consume from
that topic using confluent_kafka library.

-- 
Best regards,
Piotr

Attachment: beam.log
Description: Binary data

"""
Example:

# DirectRunner
python pipeline.py --bootstrap_servers=localhost:9092 --topic=inputs

# FlinkRunner
python batch.py --bootstrap_servers=localhost:9092 --topic=inputs \
  --runner=FlinkRunner --flink_version=1.9 \
  --flink_master=localhost:8081 --environment_type=LOOPBACK

"""
import argparse
import logging

import apache_beam as beam
from apache_beam.io.external import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def format_results(elem):
    (msg, sum_value) = elem
    return f"message: {msg}, sum: {sum_value}"

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the pipeline."""
    parser = argparse.ArgumentParser()

    parser.add_argument("--bootstrap_servers", type=str,
                        help="Kafka Broker address")
    parser.add_argument("--topic", type=str, help="Kafka topic to read from")
    parser.add_argument("--output", type=str, default="/tmp/kafka-output",
                        help="Output filepath")
    
    args, pipeline_args = parser.parse_known_args(argv)
    
    if args.topic is None or args.bootstrap_servers is None:
        parser.print_usage()
        print(f"{sys.argv[0]}: error: both --topic and --bootstrap_servers are required")
        sys.exit(1)
        
    options = PipelineOptions(pipeline_args)
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    options.view_as(SetupOptions).save_main_session = save_main_session

    # Enforce that this pipeline is always run in streaming mode
    options.view_as(StandardOptions).streaming = True
    
    consumer_conf = {"bootstrap.servers": args.bootstrap_servers}
    print(f"Starting pipeline "
          f"kafka = {args.bootstrap_servers}, topic = {args.topic}")
    with beam.Pipeline(options=options) as p:
        events = (
            p
            | "ReadFromKafka" >> kafka.ReadFromKafka(
                consumer_config=consumer_conf,
                topics=[args.topic],
            )
            | "WindowIntoFixedWindows" >> beam.WindowInto(
                beam.window.FixedWindows(60))
            | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
            | "SumByKey" >> beam.CombinePerKey(sum)
            | "FormatResults" >> beam.Map(format_results)
            | "WriteUserScoreSums" >> beam.io.WriteToText(args.output)
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

Reply via email to