Kafka producer and consumer within on sync execution
Hi, I've an usecase to respond to an API call to the client which should happen in sync. But within the api execution, the system A need to publish a kafka message to an different system B and which responds to another kafka topic. The response must be consumed by A and should respond to the client as API response. So, here this problem has a async pub-sub model and also API handling, which is a sync. Any suggestions on implementing this highly appreciated. thanks Rams
Fwd: EndOfStreamException and Client session timed out
-- Forwarded message -- From: Adeel ShahzadDate: Sat, Apr 8, 2017 at 1:43 PM Subject: EndOfStreamException and Client session timed out To: d...@kafka.apache.org Hello, We continuously get EndOfStreamException in zookeeper logs, [2017-04-06 19:15:24,350] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x15b43c712fc03a5, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn) And in the client's (consumer) logs, we get session time out, main-SendThread(localhost:2181) INFO 2017-04-06 21:30:27,823: org.apache.zookeeper.ClientCnxn Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x15b43c712fc03a5, negotiated timeout = 6000 Is it normal behavior ? We are actually amid investigation for the issue and it consumers are unable to read messages from queue. And producers are unable to put in. Thus, the whole process in jammed. What do you suggest? Thanks, Adeel Shahzad
Re: Leader not available error after kafka node goes down
Hi Ali, Try changing the default value for the streams producer retries to something large, since the default is 0 (which means that if a broker is temporarily down, streams would give that error), e.g., : final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID); ... props.put(ProducerConfig.RETRIES_CONFIG, 10); Note that the default is now changed in 0.10.2.1 (which is being voted on). While you're there, another important config we changed in 0.10.2.1 is max.poll.interval.ms, so I'd recommend changing that too. This is to avoid rebalancing during long state recoveries: props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); Thanks Eno > On 8 Apr 2017, at 03:54, Ali Akhtarwrote: > > I have a 3 node kafka cluster which is being managed via kubernetes, in > docker containers. > > Recently, one of the 3 nodes went down, and was automatically re-created by > kubernetes. > > However, now whenever I try to consume from one of my Kafka topics, thru > Kafka Streaming, i get the error: > >> 6687 [StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - > Error while fetching metadata with correlation id 1 : > {my_topic=LEADER_NOT_AVAILABLE} > >> org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: my_topic > > When I tried to re-create the topic via 'kafka-topics.sh --create', I > received: > >> Error while executing topic command : Topic "my_topic" already exists. > > Any ideas what's going on here, and how to have Kafka recover from a node > going down and automatically elect a new leader?
Leader not available error after kafka node goes down
I have a 3 node kafka cluster which is being managed via kubernetes, in docker containers. Recently, one of the 3 nodes went down, and was automatically re-created by kubernetes. However, now whenever I try to consume from one of my Kafka topics, thru Kafka Streaming, i get the error: >6687 [StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {my_topic=LEADER_NOT_AVAILABLE} > org.apache.kafka.streams.errors.StreamsException: Topic not found during partition assignment: my_topic When I tried to re-create the topic via 'kafka-topics.sh --create', I received: > Error while executing topic command : Topic "my_topic" already exists. Any ideas what's going on here, and how to have Kafka recover from a node going down and automatically elect a new leader?