So I actually discovered what the problem is after some extensive debugging with confluent.

 

My Kafka Producer was attempting to send a record to specific topic partition. The topic partition did not exist. So what the error message should have said is something like “Record sent to partition <partition number> in topic topic <topic name> but this partition does not exist”

 

Confluent says they’ll be sending an upstream patch to apache kafka to improve the error message.

 

Thanks,
Joe

 

 

From: Becket Qin <becket....@gmail.com>
Date: Thursday, December 10, 2020 at 9:27 AM
To: Joseph Lorenzini <jlorenz...@gohealth.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: FlinkKafkaProducer Fails with "Topic not present in metadata"

 

Hi Joseph, 

 

Thanks for the thorough information. Do you happen to have the trace level logging available? If so, do you mind puttng it somewhere so we can take a look?

 

Thanks,

 

Jiangjie (Becket) Qin

 

On Thu, Dec 3, 2020 at 8:55 PM Joseph Lorenzini <jlorenz...@gohealth.com> wrote:

Hi all,

I have a flink job that uses FlinkKafkaConsumer to consume messages off a kafka topic and a FlinkKafkaProducer to produce records on a Kafka topic. The consumer works fine. However, the flink job eventually fails with the following exception.

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic XXX not present in metadata after 60000 ms.

I did find this issue but didn't have any details so I am not sure if its related or not.

https://issues.apache.org/jira/browse/FLINK-18757

Some details that might be important:

- yes I verified the topic exists__
- The kafka cluster that the flink job is integrating with is the Confluent cloud platform at version 5.5.0. This means it should be compatible with apache kafka 2.5.X.  See here for details https://docs.confluent.io/platform/current/installation/versions-interoperability.html
- ACLs and SASL SSL are turned on
- a springboot app that I wrote (which uses spring kafka) is able to write to this same topic using the same credentials as what the flink job is using
- I am on flink 1.11.2 and using the flink-connector-kafka_2.11 at version 1.11.2.
- I turned on trace logs and verified that metadata requests from the flink job occur and metadata responses from the kafka broker are returned.
- I've set producer semantics to none and disabled checkpointing



Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.

Reply via email to