Re: Kafka - FindCoordinator error
Hi Rajib, We can't see the args you're passing the consumer, and the error message indicates the consumer can't find the cluster. Thanks, Liam Clarke-Hutchinson On Fri, 8 May 2020, 3:04 pm Rajib Deb, wrote: > I wanted to check if anyone has faced this issue > > Thanks > Rajib > > From: Rajib Deb > Sent: Sunday, May 3, 2020 9:51 AM > To: users@kafka.apache.org > Subject: Kafka - FindCoordinator error > > Hi > I have written a Python consumer using confluent-kafka package. After few > hours of running the consumer is dying with the below error > > cimpl.KafkaException: > KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response > error: Local: Timed out"} > > Can anyone please help me understand why this is happening > ** > Below is a portion of the code > ** > producer_conf = { > 'bootstrap.servers': 'xxx', > 'security.protocol': 'SASL_SSL', > 'sasl.mechanisms': 'PLAIN', > 'sasl.username': 'x', > 'sasl.password': '', > 'ssl.ca.location':'', > 'ssl.certificate.location': '', > 'queue.buffering.max.messages': 10, > 'queue.buffering.max.ms' : 1000, > 'batch.num.messages': 500 > } > > p = Producer(**producer_conf) > target_topic='xx' > > c = Consumer(kwargs) > source_topic='' > c.subscribe([source_topic]) > while True: > > msg = c.poll(100) #I am consuming from a topic > > if msg is None: > continue > if msg.error(): > logging.error("error occurred during polling topic") > logging.error(msg.error()) > raise KafkaException(msg.error()) > continue > > #logging.info("input msg form topic: ") > #logging.info(msg.value()) > #msgDict = json.loads(msg.value()) # taking msg into dictionary > try: > p.produce(target_topic, msg.value(), callback=delivery_callback) > #the message from the consumed topic is pushed to the target topic > c.commit() #disabled auto commit, manually committing only when > message pushed to the target topic > except BufferError: > sys.stderr.write('%% Local producer queue is full (%d messages > awaiting delivery): try again\n' % > len(p)) > except Exception as e: > print(e) > > p.poll(0) > #sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) > p.flush() > > Thanks > Rajib >
RE: Kafka - FindCoordinator error
I wanted to check if anyone has faced this issue Thanks Rajib From: Rajib Deb Sent: Sunday, May 3, 2020 9:51 AM To: users@kafka.apache.org Subject: Kafka - FindCoordinator error Hi I have written a Python consumer using confluent-kafka package. After few hours of running the consumer is dying with the below error cimpl.KafkaException: KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response error: Local: Timed out"} Can anyone please help me understand why this is happening ** Below is a portion of the code ** producer_conf = { 'bootstrap.servers': 'xxx', 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'PLAIN', 'sasl.username': 'x', 'sasl.password': '', 'ssl.ca.location':'', 'ssl.certificate.location': '', 'queue.buffering.max.messages': 10, 'queue.buffering.max.ms' : 1000, 'batch.num.messages': 500 } p = Producer(**producer_conf) target_topic='xx' c = Consumer(kwargs) source_topic='' c.subscribe([source_topic]) while True: msg = c.poll(100) #I am consuming from a topic if msg is None: continue if msg.error(): logging.error("error occurred during polling topic") logging.error(msg.error()) raise KafkaException(msg.error()) continue #logging.info("input msg form topic: ") #logging.info(msg.value()) #msgDict = json.loads(msg.value()) # taking msg into dictionary try: p.produce(target_topic, msg.value(), callback=delivery_callback) #the message from the consumed topic is pushed to the target topic c.commit() #disabled auto commit, manually committing only when message pushed to the target topic except BufferError: sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p)) except Exception as e: print(e) p.poll(0) #sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) p.flush() Thanks Rajib
Kafka - FindCoordinator error
Hi I have written a Python consumer using confluent-kafka package. After few hours of running the consumer is dying with the below error cimpl.KafkaException: KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response error: Local: Timed out"} Can anyone please help me understand why this is happening ** Below is a portion of the code ** producer_conf = { 'bootstrap.servers': 'xxx', 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'PLAIN', 'sasl.username': 'x', 'sasl.password': '', 'ssl.ca.location':'', 'ssl.certificate.location': '', 'queue.buffering.max.messages': 10, 'queue.buffering.max.ms' : 1000, 'batch.num.messages': 500 } p = Producer(**producer_conf) target_topic='xx' c = Consumer(kwargs) source_topic='' c.subscribe([source_topic]) while True: msg = c.poll(100) #I am consuming from a topic if msg is None: continue if msg.error(): logging.error("error occurred during polling topic") logging.error(msg.error()) raise KafkaException(msg.error()) continue #logging.info("input msg form topic: ") #logging.info(msg.value()) #msgDict = json.loads(msg.value()) # taking msg into dictionary try: p.produce(target_topic, msg.value(), callback=delivery_callback) #the message from the consumed topic is pushed to the target topic c.commit() #disabled auto commit, manually committing only when message pushed to the target topic except BufferError: sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p)) except Exception as e: print(e) p.poll(0) #sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) p.flush() Thanks Rajib