Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
I am unable to read from Kafka and getting the following warnings & errors when calling kafka.ReadFromKafka() (Python SDK): WARNING:root:severity: WARN timestamp { seconds: 1591370012 nanos: 52300 } message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 could not be established. Broker may not be available." log_location: "org.apache.kafka.clients.NetworkClient" thread: "18" Finally the pipeline fails with: RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata See more complete log attached. The relevant code snippet: consumer_conf = {"bootstrap.servers": 'localhost:9092'} ... kafka.ReadFromKafka( consumer_config=consumer_conf, topics=[args.topic], ) ... Also see full python script attached. I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am also not able to read from topic. I am using kafka 2.5.0 and started the broker by following https://kafka.apache.org/quickstart - using default config/server.properties. Everything runs locally, and I verified that I can publish&consume from that topic using confluent_kafka library. -- Best regards, Piotr beam.log Description: Binary data """ Example: # DirectRunner python pipeline.py --bootstrap_servers=localhost:9092 --topic=inputs # FlinkRunner python batch.py --bootstrap_servers=localhost:9092 --topic=inputs \ --runner=FlinkRunner --flink_version=1.9 \ --flink_master=localhost:8081 --environment_type=LOOPBACK """ import argparse import logging import apache_beam as beam from apache_beam.io.external import kafka from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions def format_results(elem): (msg, sum_value) = elem return f"message: {msg}, sum: {sum_value}" def run(argv=None, save_main_session=True): """Main entry point; defines and runs the pipeline.""" parser = argparse.ArgumentParser() parser.add_argument("--bootstrap_servers", type=str, help="Kafka Broker address") parser.add_argument("--topic", type=str, help="Kafka topic to read from") parser.add_argument("--output", type=str, default="/tmp/kafka-output", help="Output filepath") args, pipeline_args = parser.parse_known_args(argv) if args.topic is None or args.bootstrap_servers is None: parser.print_usage() print(f"{sys.argv[0]}: error: both --topic and --bootstrap_servers are required") sys.exit(1) options = PipelineOptions(pipeline_args) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). options.view_as(SetupOptions).save_main_session = save_main_session # Enforce that this pipeline is always run in streaming mode options.view_as(StandardOptions).streaming = True consumer_conf = {"bootstrap.servers": args.bootstrap_servers} print(f"Starting pipeline " f"kafka = {args.bootstrap_servers}, topic = {args.topic}") with beam.Pipeline(options=options) as p: events = ( p | "ReadFromKafka" >> kafka.ReadFromKafka( consumer_config=consumer_conf, topics=[args.topic], ) | "WindowIntoFixedWindows" >> beam.WindowInto( beam.window.FixedWindows(60)) | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) | "SumByKey" >> beam.CombinePerKey(sum) | "FormatResults" >> beam.Map(format_results) | "WriteUserScoreSums" >> beam.io.WriteToText(args.output) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) run()
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
+dev +Chamikara Jayalath +Heejong Lee On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk wrote: > I am unable to read from Kafka and getting the following warnings & errors > when calling kafka.ReadFromKafka() (Python SDK): > > WARNING:root:severity: WARN > timestamp { > seconds: 1591370012 > nanos: 52300 > } > message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 > could not be established. Broker may not be available." > log_location: "org.apache.kafka.clients.NetworkClient" > thread: "18" > > Finally the pipeline fails with: > > RuntimeError: org.apache.beam.sdk.util.UserCodeException: > java.lang.RuntimeException: > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > fetching topic metadata > > See more complete log attached. > > The relevant code snippet: > > consumer_conf = {"bootstrap.servers": 'localhost:9092'} > ... > kafka.ReadFromKafka( > consumer_config=consumer_conf, > topics=[args.topic], > ) > ... > > Also see full python script attached. > > I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am > also not able to read from topic. > > I am using kafka 2.5.0 and started the broker by following > https://kafka.apache.org/quickstart - using default > config/server.properties. > > Everything runs locally, and I verified that I can publish&consume from > that topic using confluent_kafka library. > > -- > Best regards, > Piotr >
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
Is it possible that "'localhost:9092'" is not available from the Docker environment where the Flink step is executed from ? Can you try specifying the actual IP address of the node running the Kafka broker ? On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: > +dev +Chamikara Jayalath > +Heejong > Lee > > On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk > wrote: > >> I am unable to read from Kafka and getting the following warnings & >> errors when calling kafka.ReadFromKafka() (Python SDK): >> >> WARNING:root:severity: WARN >> timestamp { >> seconds: 1591370012 >> nanos: 52300 >> } >> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 >> could not be established. Broker may not be available." >> log_location: "org.apache.kafka.clients.NetworkClient" >> thread: "18" >> >> Finally the pipeline fails with: >> >> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >> java.lang.RuntimeException: >> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >> fetching topic metadata >> >> See more complete log attached. >> >> The relevant code snippet: >> >> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >> ... >> kafka.ReadFromKafka( >> consumer_config=consumer_conf, >> topics=[args.topic], >> ) >> ... >> >> Also see full python script attached. >> >> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am >> also not able to read from topic. >> >> I am using kafka 2.5.0 and started the broker by following >> https://kafka.apache.org/quickstart - using default >> config/server.properties. >> >> Everything runs locally, and I verified that I can publish&consume from >> that topic using confluent_kafka library. >> >> -- >> Best regards, >> Piotr >> >
RE: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
Is Kafka itself running inside another container? If so inspect that container and see if it has a network alias and add that alias to your /etc/hosts file and map it to 127.0.0.1. From: Chamikara Jayalath Sent: Friday, June 5, 2020 2:58 PM To: Luke Cwik Cc: user ; dev ; Heejong Lee Subject: Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata Is it possible that "'localhost:9092'" is not available from the Docker environment where the Flink step is executed from ? Can you try specifying the actual IP address of the node running the Kafka broker ? On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik mailto:lc...@google.com> > wrote: +dev <mailto:d...@beam.apache.org> +Chamikara Jayalath <mailto:chamik...@google.com> +Heejong Lee <mailto:heej...@google.com> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk mailto:piotr.filip...@gmail.com> > wrote: I am unable to read from Kafka and getting the following warnings & errors when calling kafka.ReadFromKafka() (Python SDK): WARNING:root:severity: WARN timestamp { seconds: 1591370012 nanos: 52300 } message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 could not be established. Broker may not be available." log_location: "org.apache.kafka.clients.NetworkClient" thread: "18" Finally the pipeline fails with: RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata See more complete log attached. The relevant code snippet: consumer_conf = {"bootstrap.servers": 'localhost:9092'} ... kafka.ReadFromKafka( consumer_config=consumer_conf, topics=[args.topic], ) ... Also see full python script attached. I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am also not able to read from topic. I am using kafka 2.5.0 and started the broker by following https://kafka.apache.org/quickstart - using default config/server.properties. Everything runs locally, and I verified that I can publish&consume from that topic using confluent_kafka library. -- Best regards, Piotr
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
Thank you for the suggestions. Neither Kafka nor Flink run in a docker container, they all run locally. Furthermore, the same issue happens for Direct Runner. That being said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different error, see attached. On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy wrote: > Is Kafka itself running inside another container? If so inspect that > container and see if it has a network alias and add that alias to your > /etc/hosts file and map it to 127.0.0.1. > > > > *From:* Chamikara Jayalath > *Sent:* Friday, June 5, 2020 2:58 PM > *To:* Luke Cwik > *Cc:* user ; dev ; Heejong Lee > > *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while fetching > topic metadata > > > > Is it possible that "'localhost:9092'" is not available from the Docker > environment where the Flink step is executed from ? Can you try specifying > the actual IP address of the node running the Kafka broker ? > > > > On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: > > +dev +Chamikara Jayalath > +Heejong > Lee > > > > On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk > wrote: > > I am unable to read from Kafka and getting the following warnings & errors > when calling kafka.ReadFromKafka() (Python SDK): > > > > WARNING:root:severity: WARN > timestamp { > seconds: 1591370012 > nanos: 52300 > } > message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 > could not be established. Broker may not be available." > log_location: "org.apache.kafka.clients.NetworkClient" > thread: "18" > > > > Finally the pipeline fails with: > > > > RuntimeError: org.apache.beam.sdk.util.UserCodeException: > java.lang.RuntimeException: > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > fetching topic metadata > > > > See more complete log attached. > > > > The relevant code snippet: > > > > consumer_conf = {"bootstrap.servers": 'localhost:9092'} > > ... > > kafka.ReadFromKafka( > consumer_config=consumer_conf, > topics=[args.topic], > > ) > > ... > > > > Also see full python script attached. > > > > I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am > also not able to read from topic. > > > > I am using kafka 2.5.0 and started the broker by following > https://kafka.apache.org/quickstart - using default > config/server.properties. > > > > Everything runs locally, and I verified that I can publish&consume from > that topic using confluent_kafka library. > > > > -- > > Best regards, > Piotr > > -- Best regards, Piotr beam01.log Description: Binary data
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
eam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n" log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk wrote: > Thank you for the suggestions. > > Neither Kafka nor Flink run in a docker container, they all run locally. > Furthermore, the same issue happens for Direct Runner. That being said > changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different > error, see attached. > > On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy > wrote: > >> Is Kafka itself running inside another container? If so inspect that >> container and see if it has a network alias and add that alias to your >> /etc/hosts file and map it to 127.0.0.1. >> >> >> >> *From:* Chamikara Jayalath >> *Sent:* Friday, June 5, 2020 2:58 PM >> *To:* Luke Cwik >> *Cc:* user ; dev ; Heejong >> Lee >> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while fetching >> topic metadata >> >> >> >> Is it possible that "'localhost:9092'" is not available from the Docker >> environment where the Flink step is executed from ? Can you try specifying >> the actual IP address of the node running the Kafka broker ? >> >> >> >> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: >> >> +dev +Chamikara Jayalath >> +Heejong >> Lee >> >> >> >> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk >> wrote: >> >> I am unable to read from Kafka and getting the following warnings
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
; java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat > java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat > org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat > org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat > java.lang.Thread.run(Thread.java:748)\n" > log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" > > On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk > wrote: > >> Thank you for the suggestions. >> >> Neither Kafka nor Flink run in a docker container, they all run locally. >> Furthermore, the same issue happens for Direct Runner. That being said >> changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different >> error, see attached. >> >> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >> venkat_pack...@yahoo.com> wrote: >> >>> Is Kafka itself running inside another container? If so inspect that >>> container and see if it has a network alias and add that alias to your >>> /etc/hosts file and map it to 127.0.0.1. >>> >>> &
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
3:57 PM Piotr Filipiuk >> wrote: >> >>> Thank you for the suggestions. >>> >>> Neither Kafka nor Flink run in a docker container, they all run locally. >>> Furthermore, the same issue happens for Direct Runner. That being said >>> changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different >>> error, see attached. >>> >>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>> venkat_pack...@yahoo.com> wrote: >>> >>>> Is Kafka itself running inside another container? If so inspect that >>>> container and see if it has a network alias and add that alias to your >>>> /etc/hosts file and map it to 127.0.0.1. >>>> >>>> >>>> >>>> *From:* Chamikara Jayalath >>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>> *To:* Luke Cwik >>>> *Cc:* user ; dev ; Heejong >>>> Lee >>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>> fetching topic metadata >>>> >>>> >>>> >>>> Is it possible that "'localhost:9092'" is not available from the Docker >>>> environment where the Flink step is executed from ? Can you try specifying >>>> the actual IP address of the node running the Kafka broker ? >>>> >>>> >>>> >>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: >>>> >>>> +dev +Chamikara Jayalath >>>> +Heejong >>>> Lee >>>> >>>> >>>> >>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk >>>> wrote: >>>> >>>> I am unable to read from Kafka and getting the following warnings & >>>> errors when calling kafka.ReadFromKafka() (Python SDK): >>>> >>>> >>>> >>>> WARNING:root:severity: WARN >>>> timestamp { >>>> seconds: 1591370012 >>>> nanos: 52300 >>>> } >>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node >>>> -1 could not be established. Broker may not be available." >>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>> thread: "18" >>>> >>>> >>>> >>>> Finally the pipeline fails with: >>>> >>>> >>>> >>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>> java.lang.RuntimeException: >>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>>> fetching topic metadata >>>> >>>> >>>> >>>> See more complete log attached. >>>> >>>> >>>> >>>> The relevant code snippet: >>>> >>>> >>>> >>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>> >>>> ... >>>> >>>> kafka.ReadFromKafka( >>>> consumer_config=consumer_conf, >>>> topics=[args.topic], >>>> >>>> ) >>>> >>>> ... >>>> >>>> >>>> >>>> Also see full python script attached. >>>> >>>> >>>> >>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am >>>> also not able to read from topic. >>>> >>>> >>>> >>>> I am using kafka 2.5.0 and started the broker by following >>>> https://kafka.apache.org/quickstart - using default >>>> config/server.properties. >>>> >>>> >>>> >>>> Everything runs locally, and I verified that I can publish&consume from >>>> that topic using confluent_kafka library. >>>> >>>> >>>> >>>> -- >>>> >>>> Best regards, >>>> Piotr >>>> >>>> >>> >>> -- >>> Best regards, >>> Piotr >>> >> >> >> -- >> Best regards, >> Piotr >> >
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
er.processBundle(ProcessBundleHandler.java:294)\n\tat >>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>> java.lang.Thread.run(Thread.java:748)\n" >>> log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" >>> >>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk >>> wrote: >>> >>>> Thank you for the suggestions. >>>> >>>> Neither Kafka nor Flink run in a docker container, they all run >>>> locally. Furthermore, the same issue happens for Direct Runner. That being >>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a >>>> different error, see attached. >>>> >>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>>> venkat_pack...@yahoo.com> wrote: >>>> >>>>> Is Kafka itself running inside another container? If so inspect that >>>>> container and see if it has a network alias and add that alias to your >>>>> /etc/hosts file and map it to 127.0.0.1. >>>>> >>>>> >>>>> >>>>> *From:* Chamikara Jayalath >>>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>>> *To:* Luke Cwik >>>>> *Cc:* user ; dev ; Heejong >>>>> Lee >>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>>> fetching topic metadata >>>>> >>>>> >>>>> >>>>> Is it possible that "'localhost:9092'" is not available from the >>>>> Docker environment where the Flink step is executed from ? Can you try >>>>> specifying the actual IP address of the node running the Kafka broker ? >>>>> >>>>> >>>>> >>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: >>>>> >>>>> +dev +Chamikara Jayalath >>>>> +Heejong >>>>> Lee >>>>> >>>>> >>>>> >>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk < >>>>> piotr.filip...@gmail.com> wrote: >>>>> >>>>> I am unable to read from Kafka and getting the following warnings & >>>>> errors when calling kafka.ReadFromKafka() (Python SDK): >>>>> >>>>> >>>>> >>>>> WARNING:root:severity: WARN >>>>> timestamp { >>>>> seconds: 1591370012 >>>>> nanos: 52300 >>>>> } >>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node >>>>> -1 could not be established. Broker may not be available." >>>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>>> thread: "18" >>>>> >>>>> >>>>> >>>>> Finally the pipeline fails with: >>>>> >>>>> >>>>> >>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>> java.lang.RuntimeException: >>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>>>> fetching topic metadata >>>>> >>>>> >>>>> >>>>> See more complete log attached. >>>>> >>>>> >>>>> >>>>> The relevant code snippet: >>>>> >>>>> >>>>> >>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>>> >>>>> ... >>>>> >>>>> kafka.ReadFromKafka( >>>>> consumer_config=consumer_conf, >>>>> topics=[args.topic], >>>>> >>>>> ) >>>>> >>>>> ... >>>>> >>>>> >>>>> >>>>> Also see full python script attached. >>>>> >>>>> >>>>> >>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I >>>>> am also not able to read from topic. >>>>> >>>>> >>>>> >>>>> I am using kafka 2.5.0 and started the broker by following >>>>> https://kafka.apache.org/quickstart - using default >>>>> config/server.properties. >>>>> >>>>> >>>>> >>>>> Everything runs locally, and I verified that I can >>>>> publish&consume from that topic using confluent_kafka library. >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Best regards, >>>>> Piotr >>>>> >>>>> >>>> >>>> -- >>>> Best regards, >>>> Piotr >>>> >>> >>> >>> -- >>> Best regards, >>> Piotr >>> >>
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
:157)\n\tat >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>> java.lang.Thread.run(Thread.java:748)\nCaused by: >>>> java.lang.NoClassDefFoundError: >>>> org/springframework/expression/EvaluationContext\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused >>>> by: java.lang.ClassNotFoundException: >>>> org.springframework.expression.EvaluationContext\n\tat >>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>> Source)\n\tat >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
rg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>> Source)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>> java.lang.Thread.run(Thread.java:748)\n" >>>>>> log_location: >>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" >>>>>> >>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk < >>>>>> piotr.filip...@gmail.com> wrote: >>>>>> >>>>>>> Thank you for the suggestions. >>>>>>> >>>>>>> Neither Kafka nor Flink run in a docker container, they all run >>>>>>> locally. Furthermore, the same issue happens for Direct Runner. That >>>>>>> being >>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a >>>>>>> different error, see attached. >>>>>>> >>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>>>>>> venkat_pack...@yahoo.com> wrote: >>>>>>> >>>>>>>> Is Kafka itself running inside another container? If so inspect >>>>>>>> that container and see if it has a network alias and add that alias to >>>>>>>> your >>>>>>>> /etc/hosts file and map it to 127.0.0.1. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *From:* Chamikara Jayalath >>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>>>>>> *To:* Luke Cwik >>>>>>>> *Cc:* user ; dev ; >>>>>>>> Heejong Lee >>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>>>>>> fetching topic metadata >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Is it possible that "'localhost:9092'" is not available from the >>>>>>>> Docker environment where the Flink step is executed from ? Can you try >>>>>>>> specifying the actual IP address of the node running the Kafka broker ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: >>>>>>>> >>>>>>>> +dev +Chamikara Jayalath >>>>>>>> +Heejong Lee >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk < >>>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>>> >>>>>>>> I am unable to read from Kafka and getting the following warnings & >>>>>>>> errors when calling kafka.ReadFromKafka() (Python SDK): >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> WARNING:root:severity: WARN >>>>>>>> timestamp { >>>>>>>> seconds: 1591370012 >>>>>>>> nanos: 52300 >>>>>>>> } >>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to >>>>>>>> node -1 could not be established. Broker may not be available." >>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>>>>>> thread: "18" >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Finally the pipeline fails with: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>>>> java.lang.RuntimeException: >>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>>>>>>> fetching topic metadata >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> See more complete log attached. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> The relevant code snippet: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> kafka.ReadFromKafka( >>>>>>>> consumer_config=consumer_conf, >>>>>>>> topics=[args.topic], >>>>>>>> >>>>>>>> ) >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Also see full python script attached. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner >>>>>>>> I am also not able to read from topic. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I am using kafka 2.5.0 and started the broker by following >>>>>>>> https://kafka.apache.org/quickstart - using default >>>>>>>> config/server.properties. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Everything runs locally, and I verified that I can >>>>>>>> publish&consume from that topic using confluent_kafka library. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Piotr >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards, >>>>>>> Piotr >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards, >>>>>> Piotr >>>>>> >>>>> >> >> -- >> Best regards, >> Piotr >> > > > -- > Best regards, > Piotr > -- Best regards, Piotr
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
r.java:497)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>> Source)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>> Source)\n\tat >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>>> java.lang.Thread.run(Thread.java:748)\n" >>>>>>> log_location: >>>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" >>>>>>> >>>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk < >>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>> >>>>>>>> Thank you for the suggestions. >>>>>>>> >>>>>>>> Neither Kafka nor Flink run in a docker container, they all run >>>>>>>> locally. Furthermore, the same issue happens for Direct Runner. That >>>>>>>> being >>>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a >>>>>>>> different error, see attached. >>>>>>>> >>>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>>>>>>> venkat_pack...@yahoo.com> wrote: >>>>>>>> >>>>>>>>> Is Kafka itself running inside another container? If so inspect >>>>>>>>> that container and see if it has a network alias and add that alias >>>>>>>>> to your >>>>>>>>> /etc/hosts file and map it to 127.0.0.1. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *From:* Chamikara Jayalath >>>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>>>>>>> *To:* Luke Cwik >>>>>>>>> *Cc:* user ; dev ; >>>>>>>>> Heejong Lee >>>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>>>>>>> fetching topic metadata >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Is it possible that "'localhost:9092'" is not available from the >>>>>>>>> Docker environment where the Flink step is executed from ? Can you try >>>>>>>>> specifying the actual IP address of the node running the Kafka broker >>>>>>>>> ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: >>>>>>>>> >>>>>>>>> +dev +Chamikara Jayalath >>>>>>>>> +Heejong Lee >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk < >>>>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> I am unable to read from Kafka and getting the following warnings >>>>>>>>> & errors when calling kafka.ReadFromKafka() (Python SDK): >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> WARNING:root:severity: WARN >>>>>>>>> timestamp { >>>>>>>>> seconds: 1591370012 >>>>>>>>> nanos: 52300 >>>>>>>>> } >>>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to >>>>>>>>> node -1 could not be established. Broker may not be available." >>>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>>>>>>> thread: "18" >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Finally the pipeline fails with: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>>>>> java.lang.RuntimeException: >>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>>>>>>>> fetching topic metadata >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> See more complete log attached. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> The relevant code snippet: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>>>>>>> >>>>>>>>> ... >>>>>>>>> >>>>>>>>> kafka.ReadFromKafka( >>>>>>>>> consumer_config=consumer_conf, >>>>>>>>> topics=[args.topic], >>>>>>>>> >>>>>>>>> ) >>>>>>>>> >>>>>>>>> ... >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Also see full python script attached. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner >>>>>>>>> I am also not able to read from topic. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I am using kafka 2.5.0 and started the broker by following >>>>>>>>> https://kafka.apache.org/quickstart - using default >>>>>>>>> config/server.properties. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Everything runs locally, and I verified that I can >>>>>>>>> publish&consume from that topic using confluent_kafka library. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> Piotr >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards, >>>>>>>> Piotr >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards, >>>>>>> Piotr >>>>>>> >>>>>> >>> >>> -- >>> Best regards, >>> Piotr >>> >> >> >> -- >> Best regards, >> Piotr >> > > > -- > Best regards, > Piotr >
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
tat >>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
ainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by: >>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused >>>>>>>>> by: java.lang.ClassNotFoundException: >>>>>>>>> org.springframework.expression.EvaluationContext\n\tat >>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>>>>> Source)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>>>> Source)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessB
Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata
cessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by: >>>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused >>>>>>>>>> by: java.lang.ClassNotFoundException: >>>>>>>>>> org.springframework.expression.EvaluationContext\n\tat >>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>>>>> Source)\n\tat >>>>>>>>>>