I am unable to read from Kafka and getting the following warnings & errors when calling kafka.ReadFromKafka() (Python SDK):
WARNING:root:severity: WARN timestamp { seconds: 1591370012 nanos: 523000000 } message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 could not be established. Broker may not be available." log_location: "org.apache.kafka.clients.NetworkClient" thread: "18" Finally the pipeline fails with: RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata See more complete log attached. The relevant code snippet: consumer_conf = {"bootstrap.servers": 'localhost:9092'} ... kafka.ReadFromKafka( consumer_config=consumer_conf, topics=[args.topic], ) ... Also see full python script attached. I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am also not able to read from topic. I am using kafka 2.5.0 and started the broker by following https://kafka.apache.org/quickstart - using default config/server.properties. Everything runs locally, and I verified that I can publish&consume from that topic using confluent_kafka library. -- Best regards, Piotr
beam.log
Description: Binary data
""" Example: # DirectRunner python pipeline.py --bootstrap_servers=localhost:9092 --topic=inputs # FlinkRunner python batch.py --bootstrap_servers=localhost:9092 --topic=inputs \ --runner=FlinkRunner --flink_version=1.9 \ --flink_master=localhost:8081 --environment_type=LOOPBACK """ import argparse import logging import apache_beam as beam from apache_beam.io.external import kafka from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions def format_results(elem): (msg, sum_value) = elem return f"message: {msg}, sum: {sum_value}" def run(argv=None, save_main_session=True): """Main entry point; defines and runs the pipeline.""" parser = argparse.ArgumentParser() parser.add_argument("--bootstrap_servers", type=str, help="Kafka Broker address") parser.add_argument("--topic", type=str, help="Kafka topic to read from") parser.add_argument("--output", type=str, default="/tmp/kafka-output", help="Output filepath") args, pipeline_args = parser.parse_known_args(argv) if args.topic is None or args.bootstrap_servers is None: parser.print_usage() print(f"{sys.argv[0]}: error: both --topic and --bootstrap_servers are required") sys.exit(1) options = PipelineOptions(pipeline_args) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). options.view_as(SetupOptions).save_main_session = save_main_session # Enforce that this pipeline is always run in streaming mode options.view_as(StandardOptions).streaming = True consumer_conf = {"bootstrap.servers": args.bootstrap_servers} print(f"Starting pipeline " f"kafka = {args.bootstrap_servers}, topic = {args.topic}") with beam.Pipeline(options=options) as p: events = ( p | "ReadFromKafka" >> kafka.ReadFromKafka( consumer_config=consumer_conf, topics=[args.topic], ) | "WindowIntoFixedWindows" >> beam.WindowInto( beam.window.FixedWindows(60)) | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) | "SumByKey" >> beam.CombinePerKey(sum) | "FormatResults" >> beam.Map(format_results) | "WriteUserScoreSums" >> beam.io.WriteToText(args.output) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) run()