Re: Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method
Kafka upgraded from 3.5.1 to 3.7.0 version On Fri, May 10, 2024 at 2:13 AM Sophie Blee-Goldman wrote: > What version did you upgrade from? > > On Wed, May 8, 2024 at 10:32 PM Penumarthi Durga Prasad Chowdary < > prasad.penumar...@gmail.com> wrote: > > > Hi Team, > > I'm utilizing Kafka Streams to handle data from Kafka topics, running > > multiple instances with the same application ID. This enables distributed > > processing of Kafka data across these instances. Furthermore, I've > > implemented state stores with time windows and session windows. To > retrieve > > windows efficiently, I've established a remote query mechanism between > > Kafka Streams instances. By leveraging the queryMetadataForKey method on > > streams, I can retrieve the hostname where a specific key was processed > and > > where the corresponding window data resides in the state store. > > *streams.queryMetadataForKey(storeName, recordKey, new > > DataKeySerilizer()).activeHost();* > > This functionality has been running smoothly for quite some time, up > until > > we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the > > upgrade, we've noticed some unexpected behavior that we didn't encounter > > with the previous versions. > > > >- The queryMetadataForKey method is returning "unavailable" as the > >hostname, despite having two Kafka Streams instances in a running > state. > >The issue seems to persist intermittently, and restarting the Kafka > > Streams > >instances temporarily resolves it. However, the problem resurfaces > after > >some time. > >- Additionally, we've observed that the state store of Kafka Streams > >instances is not evenly distributed as it was before the upgrade. > >Previously, if a Kafka topic had 10 partitions and two Kafka Streams > >instances were running, these 10 partitions would be evenly shared > > between > >the two instances. However, this behavior seems to have changed after > > the > >upgrade. > > > > When can this issue happen? > > How can I fix the issue ? > > I would like to express my gratitude in advance for any assistance > > provided. > > -- > > > > > > > > Thank's&Regard's, > > Prasad, > > 91-9030546248. > > > -- Thank's&Regard's, Prasad, 91-9030546248.
Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method
Hi Team, I'm utilizing Kafka Streams to handle data from Kafka topics, running multiple instances with the same application ID. This enables distributed processing of Kafka data across these instances. Furthermore, I've implemented state stores with time windows and session windows. To retrieve windows efficiently, I've established a remote query mechanism between Kafka Streams instances. By leveraging the queryMetadataForKey method on streams, I can retrieve the hostname where a specific key was processed and where the corresponding window data resides in the state store. *streams.queryMetadataForKey(storeName, recordKey, new DataKeySerilizer()).activeHost();* This functionality has been running smoothly for quite some time, up until we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the upgrade, we've noticed some unexpected behavior that we didn't encounter with the previous versions. - The queryMetadataForKey method is returning "unavailable" as the hostname, despite having two Kafka Streams instances in a running state. The issue seems to persist intermittently, and restarting the Kafka Streams instances temporarily resolves it. However, the problem resurfaces after some time. - Additionally, we've observed that the state store of Kafka Streams instances is not evenly distributed as it was before the upgrade. Previously, if a Kafka topic had 10 partitions and two Kafka Streams instances were running, these 10 partitions would be evenly shared between the two instances. However, this behavior seems to have changed after the upgrade. When can this issue happen? How can I fix the issue ? I would like to express my gratitude in advance for any assistance provided. -- Thank's&Regard's, Prasad, 91-9030546248.
Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001
Kafka versions 3.5.1 and 3.7.0, we're still encountering persistent issues. The Kafka Streams library is aligned with these Kafka versions. Upon analysis of the logs, it seems that the problem may occur when a Kafka node disconnects from Kafka Streams processes. This suspicion is supported by the abundance of network messages indicating disconnections, such as > > org.apache.kafka.clients.NetworkClient > ThreadName: > kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9 > Message: [Consumer > clientId=kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9-consumer, > groupId=kafka-streams-exec-0-test-store ] Node 102 disconnected. On Mon, Apr 22, 2024 at 7:16 AM Matthias J. Sax wrote: > Not sure either, but it sounds like a bug to me. Can you reproduce this > reliably? What version are you using? > > It would be best if you could file a Jira ticket and we can take it from > there. > > > -Matthias > > On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote: > > Hi , > > I have an issue in kafka-streams while constructing kafka-streams state > > store windows(TimeWindow and SessionWindow). While kafka-streams > > processing data sometimes intermittent kafka-streams process throwing > below > > error > > ThreadName: > > > kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9 > > TraceID: unknown CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164 > > Message: stream-client [ kafka-streams-exec-0-test-store > > -6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams > > uncaught exception handler > > org.apache.kafka.streams.errors.StreamsException: failed to initialize > > processor KSTREAM-AGGREGATE-01 > >at > > > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115) > >at > > > org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986) > >at > > > org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271) > >at > > > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716) > >at > > > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901) > >at > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778) > >at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > >at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > > Caused by: java.lang.NullPointerException > >at > > > org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46) > >at > > > org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138) > >at > > > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107) > >... 7 more > > Here my understanding is state-store is null and at that time > > stateStore.flush() gets invoked to send the data to stateStore, this > leads > > to the above error. This error can be caught inside kafka-streams > > setUncaughtExceptionHandler. > >streams.setUncaughtExceptionHandler(throwable -> { > >LOGGER.error("Exception in streams", throwable); > >return > > > StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; > >}); > > I'm uncertain about the exact reason for this issue. Everything seems to > be > > in order, including the Kafka cluster, and there are no errors in the > Kafka > > Streams except for a few logs indicating node disconnections. > > Is there a better way to handle this error? > > When can this issue happen ? > > I would like to express my gratitude in advance for any assistance > provided. > -- Thank's&Regard's, Prasad, 91-9030546248.
Failed to initialize processor KSTREAM-AGGREGATE-0000000001
Hi , I have an issue in kafka-streams while constructing kafka-streams state store windows(TimeWindow and SessionWindow). While kafka-streams processing data sometimes intermittent kafka-streams process throwing below error ThreadName: kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9 TraceID: unknown CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164 Message: stream-client [ kafka-streams-exec-0-test-store -6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams uncaught exception handler org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-AGGREGATE-01 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115) at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986) at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271) at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716) at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) Caused by: java.lang.NullPointerException at org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46) at org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107) ... 7 more Here my understanding is state-store is null and at that time stateStore.flush() gets invoked to send the data to stateStore, this leads to the above error. This error can be caught inside kafka-streams setUncaughtExceptionHandler. streams.setUncaughtExceptionHandler(throwable -> { LOGGER.error("Exception in streams", throwable); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; }); I'm uncertain about the exact reason for this issue. Everything seems to be in order, including the Kafka cluster, and there are no errors in the Kafka Streams except for a few logs indicating node disconnections. Is there a better way to handle this error? When can this issue happen ? I would like to express my gratitude in advance for any assistance provided. -- Thank's&Regard's, Prasad, 91-9030546248.
Reg : Kafka cluster issue
Hi , I have an environment like kafka cluster with 3 brokers & kafka-streams to process data of kafka topic. Here kafka & kafka-streams versions are 2.7.0 . Which is working fine for sometime , later having issues in kafka-streams, in logs showing below error's - Execution error java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException - Rebalance failed. org.apache.kafka.common.errors.DisconnectException - Rebalance failed. org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The coordinator is loading and hence can't process requests. Whenever restarting kafka-cluster & kafka-streams , then again working fine for some time. I am not sure exactly where the problem is , But when i looked at kafka metrics which are below. Here i thought mostly indicated metrics having high error rate & looks like *kafka cluster is unstable*. network<>Count) # TYPE kafka_network_requestmetrics_errors_total untyped kafka_network_requestmetrics_errors_total{request="ApiVersions, error=NONE",} 510.0 kafka_network_requestmetrics_errors_total{request="Fetch, error=NOT_LEADER_OR_FOLLOWER",} 53.0 kafka_network_requestmetrics_errors_total{request="Fetch, error=FENCED_LEADER_EPOCH",} 334507.0 kafka_network_requestmetrics_errors_total{request="JoinGroup, error=NONE",} 9.0 kafka_network_requestmetrics_errors_total{request="JoinGroup, error=COORDINATOR_LOAD_IN_PROGRESS",} 252.0 kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch, error=UNKNOWN_LEADER_EPOCH",} 42.0 kafka_network_requestmetrics_errors_total{request="JoinGroup, error=NOT_COORDINATOR",} 2.0 kafka_network_requestmetrics_errors_total{request="LeaderAndIsr, error=NONE",} 346.0 kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch, error=NONE",} 62.0 kafka_network_requestmetrics_errors_total{request="FindCoordinator, error=COORDINATOR_NOT_AVAILABLE",} 104.0 kafka_network_requestmetrics_errors_total{request="ListOffsets, error=NOT_LEADER_OR_FOLLOWER",} 15.0 kafka_network_requestmetrics_errors_total{request="SyncGroup, error=UNKNOWN_MEMBER_ID",} 3.0 kafka_network_requestmetrics_errors_total{request="OffsetCommit, error=NONE",} 1883.0 kafka_network_requestmetrics_errors_total{request="Heartbeat, error=NOT_COORDINATOR",} 2.0 kafka_network_requestmetrics_errors_total{request="Metadata, error=NONE",} 1091.0 kafka_network_requestmetrics_errors_total{request="Heartbeat, error=UNKNOWN_MEMBER_ID",} 5.0 kafka_network_requestmetrics_errors_total{request="ListOffsets, error=FENCED_LEADER_EPOCH",} 5.0 kafka_network_requestmetrics_errors_total{request="DeleteRecords, error=NONE",} 756.0 kafka_network_requestmetrics_errors_total{request="OffsetFetch, error=NONE",} 134.0 kafka_network_requestmetrics_errors_total{request="FindCoordinator, error=NONE",} 19.0 kafka_network_requestmetrics_errors_total{request="ListOffsets, error=NONE",} 321.0 kafka_network_requestmetrics_errors_total{request="SyncGroup, error=NONE",} 6.0 kafka_network_requestmetrics_errors_total{request="JoinGroup, error=MEMBER_ID_REQUIRED",} 9.0 kafka_network_requestmetrics_errors_total{request="UpdateMetadata, error=NONE",} 13.0 kafka_network_requestmetrics_errors_total{request="Fetch, error=UNKNOWN_LEADER_EPOCH",} 88.0 kafka_network_requestmetrics_errors_total{request="Fetch, error=NONE",} 16927.0 kafka_network_requestmetrics_errors_total{request="Heartbeat, error=NONE",} 18353.0 kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch, error=UNKNOWN_TOPIC_OR_PARTITION",} 24.0 kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch, error=NOT_LEADER_OR_FOLLOWER",} 17.0 kafka_network_requestmetrics_errors_total{request="Produce, error=NONE",} 4450.0 # HELP jmx_scrape_error Non-zero if this scrape failed. These are the kafka configurations which i used rm -f /var/lib/kafka/kafka-0/.lock; rm -f /var/lib/kafka/kafka-0/meta.properties; exec kafka-server-start.sh /opt/kafka/config/server.properties --override unclean.leader.election.enable=true --override broker.id=0 --override listeners=PLAINTEXT://\${LOCAL_POD_IP}:9093 --override host.name=#[[${HOSTNAME}]]# --override advertised.listeners=PLAINTEXT://\${LOCAL_POD_IP}:9093 --override log.dirs=/var/lib/kafka/kafka-0 --override auto.create.topics.enable=true --override auto.leader.rebalance.enable=true --override compression.type=producer --override delete.topic.enable=false --override offsets.topic.replication.factor=2 --override broker.id.generation.enable=true --override default.replication.factor=2 --override num.partitions=10 --override log.retention.bytes=536870912000 --override socket.request.max.bytes=1195725856 --override log.retention.hours=360 --override log.roll.hours=360 --override max.message.bytes=5242880 --override zookeeper.ssl.endpoint.identification.algorithm --override zookeeper.ssl.client.enable=true --override zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxn
Reg : kafka state store HA
Hi , Working on kafka-streams to process data & stored into state-store(stored into local file system & changelog topic as well).Two streams instances are started for HA & performance improvement ,then streams are re-balanced the topic partition to process. Here we are facing issue with state-store data , here two stream instances have their own individual state-stores and those are not in sync(local file system), but in sync with changelog topic . So *when querying on state-store data, it's not giving results*. I read this article https://tech.transferwise.com/achieving-high-availability-with-kafka-streams/ , added num.standby.replicas as 1 & 2 , but it's couldn't resolve the issue. Is there a way to overcome this issue? Thanks & Regards Durga Prasad -- Thank's&Regard's, Prasad, 91-9030546248.