Re: Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method

2024-05-09 Thread Penumarthi Durga Prasad Chowdary
Kafka upgraded from 3.5.1 to 3.7.0 version

On Fri, May 10, 2024 at 2:13 AM Sophie Blee-Goldman 
wrote:

> What version did you upgrade from?
>
> On Wed, May 8, 2024 at 10:32 PM Penumarthi Durga Prasad Chowdary <
> prasad.penumar...@gmail.com> wrote:
>
> > Hi Team,
> >   I'm utilizing Kafka Streams to handle data from Kafka topics, running
> > multiple instances with the same application ID. This enables distributed
> > processing of Kafka data across these instances. Furthermore, I've
> > implemented state stores with time windows and session windows. To
> retrieve
> > windows efficiently, I've established a remote query mechanism between
> > Kafka Streams instances. By leveraging the queryMetadataForKey method on
> > streams, I can retrieve the hostname where a specific key was processed
> and
> > where the corresponding window data resides in the state store.
> >  *streams.queryMetadataForKey(storeName, recordKey, new
> > DataKeySerilizer()).activeHost();*
> > This functionality has been running smoothly for quite some time, up
> until
> > we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the
> > upgrade, we've noticed some unexpected behavior that we didn't encounter
> > with the previous versions.
> >
> >- The queryMetadataForKey method is returning "unavailable" as the
> >hostname, despite having two Kafka Streams instances in a running
> state.
> >The issue seems to persist intermittently, and restarting the Kafka
> > Streams
> >instances temporarily resolves it. However, the problem resurfaces
> after
> >some time.
> >- Additionally, we've observed that the state store of Kafka Streams
> >instances is not evenly distributed as it was before the upgrade.
> >Previously, if a Kafka topic had 10 partitions and two Kafka Streams
> >instances were running, these 10 partitions would be evenly shared
> > between
> >the two instances. However, this behavior seems to have changed after
> > the
> >upgrade.
> >
> > When can this issue happen?
> > How can I fix the issue ?
> > I would like to express my gratitude in advance for any assistance
> > provided.
> > --
> >
> >
> >
> > Thank's&Regard's,
> > Prasad,
> > 91-9030546248.
> >
>


-- 


Thank's&Regard's,
Prasad,
91-9030546248.


Kafka streams state store return hostname as unavailable when calling queryMetadataForKey method

2024-05-08 Thread Penumarthi Durga Prasad Chowdary
Hi Team,
  I'm utilizing Kafka Streams to handle data from Kafka topics, running
multiple instances with the same application ID. This enables distributed
processing of Kafka data across these instances. Furthermore, I've
implemented state stores with time windows and session windows. To retrieve
windows efficiently, I've established a remote query mechanism between
Kafka Streams instances. By leveraging the queryMetadataForKey method on
streams, I can retrieve the hostname where a specific key was processed and
where the corresponding window data resides in the state store.
 *streams.queryMetadataForKey(storeName, recordKey, new
DataKeySerilizer()).activeHost();*
This functionality has been running smoothly for quite some time, up until
we upgraded our Kafka and Kafka Streams versions to 3.7.0. Since the
upgrade, we've noticed some unexpected behavior that we didn't encounter
with the previous versions.

   - The queryMetadataForKey method is returning "unavailable" as the
   hostname, despite having two Kafka Streams instances in a running state.
   The issue seems to persist intermittently, and restarting the Kafka Streams
   instances temporarily resolves it. However, the problem resurfaces after
   some time.
   - Additionally, we've observed that the state store of Kafka Streams
   instances is not evenly distributed as it was before the upgrade.
   Previously, if a Kafka topic had 10 partitions and two Kafka Streams
   instances were running, these 10 partitions would be evenly shared between
   the two instances. However, this behavior seems to have changed after the
   upgrade.

When can this issue happen?
How can I fix the issue ?
I would like to express my gratitude in advance for any assistance provided.
--



Thank's&Regard's,
Prasad,
91-9030546248.


Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-05-03 Thread Penumarthi Durga Prasad Chowdary
Kafka versions 3.5.1 and 3.7.0, we're still encountering persistent issues.
The Kafka Streams library is aligned with these Kafka versions. Upon
analysis of the logs, it seems that the problem may occur when a Kafka node
disconnects from Kafka Streams processes. This suspicion is supported by
the abundance of network messages indicating disconnections, such as
>
>  org.apache.kafka.clients.NetworkClient
> ThreadName: 
> kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
>   Message: [Consumer
> clientId=kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9-consumer,
> groupId=kafka-streams-exec-0-test-store ] Node 102 disconnected.



On Mon, Apr 22, 2024 at 7:16 AM Matthias J. Sax  wrote:

> Not sure either, but it sounds like a bug to me. Can you reproduce this
> reliably? What version are you using?
>
> It would be best if you could file a Jira ticket and we can take it from
> there.
>
>
> -Matthias
>
> On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:
> > Hi ,
> > I have an issue in kafka-streams while constructing kafka-streams state
> > store  windows(TimeWindow and SessionWindow). While kafka-streams
> > processing data sometimes intermittent kafka-streams process throwing
> below
> > error
> > ThreadName:
> >
> kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
> > TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
> >   Message: stream-client [ kafka-streams-exec-0-test-store
> > -6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
> > uncaught exception handler
> > org.apache.kafka.streams.errors.StreamsException: failed to initialize
> > processor KSTREAM-AGGREGATE-01
> >at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
> >at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
> >at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
> >at
> >
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
> >at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
> >at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
> >at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
> >at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
> >   Caused by: java.lang.NullPointerException
> >at
> >
> org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)
> >at
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
> >at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
> >... 7 more
> > Here my understanding is state-store is null and at that time
> > stateStore.flush() gets invoked to send the data to stateStore, this
> leads
> > to the above error. This error can be caught inside kafka-streams
> > setUncaughtExceptionHandler.
> >streams.setUncaughtExceptionHandler(throwable -> {
> >LOGGER.error("Exception in streams", throwable);
> >return
> >
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
> >});
> > I'm uncertain about the exact reason for this issue. Everything seems to
> be
> > in order, including the Kafka cluster, and there are no errors in the
> Kafka
> > Streams except for a few logs indicating node disconnections.
> > Is there a better way to handle this error?
> > When can this issue happen ?
> > I would like to express my gratitude in advance for any assistance
> provided.
>


-- 


Thank's&Regard's,
Prasad,
91-9030546248.


Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-04-21 Thread Penumarthi Durga Prasad Chowdary
Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
 Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
  at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
  at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
  at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
  at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
  at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
  at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
  at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
  at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 Caused by: java.lang.NullPointerException
  at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)
  at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
  at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
  ... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
  streams.setUncaughtExceptionHandler(throwable -> {
  LOGGER.error("Exception in streams", throwable);
  return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
  });
I'm uncertain about the exact reason for this issue. Everything seems to be
in order, including the Kafka cluster, and there are no errors in the Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance provided.
-- 


Thank's&Regard's,
Prasad,
91-9030546248.


Reg : Kafka cluster issue

2021-07-07 Thread Penumarthi Durga Prasad Chowdary
Hi ,
 I have an environment like kafka cluster with 3 brokers & kafka-streams to
process data of kafka topic.
 Here kafka & kafka-streams versions are  2.7.0 .
 Which is working fine for sometime , later having issues in
kafka-streams, in logs showing below error's

   - Execution error java.util.concurrent.ExecutionException:
   org.apache.kafka.common.errors.TimeoutException
   - Rebalance failed. org.apache.kafka.common.errors.DisconnectException
   - Rebalance failed.
   org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The
   coordinator is loading and hence can't process requests.

Whenever restarting kafka-cluster & kafka-streams , then again working fine
for some time.
I am not sure exactly where the problem is , But when i looked at kafka
metrics which are below.
Here i thought mostly  indicated  metrics having high error rate &
looks like *kafka cluster is unstable*.

network<>Count)
# TYPE kafka_network_requestmetrics_errors_total untyped
kafka_network_requestmetrics_errors_total{request="ApiVersions,
error=NONE",} 510.0
kafka_network_requestmetrics_errors_total{request="Fetch,
error=NOT_LEADER_OR_FOLLOWER",} 53.0
 kafka_network_requestmetrics_errors_total{request="Fetch,
error=FENCED_LEADER_EPOCH",} 334507.0
kafka_network_requestmetrics_errors_total{request="JoinGroup, error=NONE",}
9.0
kafka_network_requestmetrics_errors_total{request="JoinGroup,
error=COORDINATOR_LOAD_IN_PROGRESS",} 252.0
kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch,
error=UNKNOWN_LEADER_EPOCH",} 42.0
kafka_network_requestmetrics_errors_total{request="JoinGroup,
error=NOT_COORDINATOR",} 2.0
 kafka_network_requestmetrics_errors_total{request="LeaderAndIsr,
error=NONE",} 346.0
kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch,
error=NONE",} 62.0
 kafka_network_requestmetrics_errors_total{request="FindCoordinator,
error=COORDINATOR_NOT_AVAILABLE",} 104.0
kafka_network_requestmetrics_errors_total{request="ListOffsets,
error=NOT_LEADER_OR_FOLLOWER",} 15.0
kafka_network_requestmetrics_errors_total{request="SyncGroup,
error=UNKNOWN_MEMBER_ID",} 3.0
 kafka_network_requestmetrics_errors_total{request="OffsetCommit,
error=NONE",} 1883.0
kafka_network_requestmetrics_errors_total{request="Heartbeat,
error=NOT_COORDINATOR",} 2.0
 kafka_network_requestmetrics_errors_total{request="Metadata,
error=NONE",} 1091.0
kafka_network_requestmetrics_errors_total{request="Heartbeat,
error=UNKNOWN_MEMBER_ID",} 5.0
kafka_network_requestmetrics_errors_total{request="ListOffsets,
error=FENCED_LEADER_EPOCH",} 5.0
 kafka_network_requestmetrics_errors_total{request="DeleteRecords,
error=NONE",} 756.0
kafka_network_requestmetrics_errors_total{request="OffsetFetch,
error=NONE",} 134.0
kafka_network_requestmetrics_errors_total{request="FindCoordinator,
error=NONE",} 19.0
kafka_network_requestmetrics_errors_total{request="ListOffsets,
error=NONE",} 321.0
kafka_network_requestmetrics_errors_total{request="SyncGroup, error=NONE",}
6.0
kafka_network_requestmetrics_errors_total{request="JoinGroup,
error=MEMBER_ID_REQUIRED",} 9.0
kafka_network_requestmetrics_errors_total{request="UpdateMetadata,
error=NONE",} 13.0
kafka_network_requestmetrics_errors_total{request="Fetch,
error=UNKNOWN_LEADER_EPOCH",} 88.0
 kafka_network_requestmetrics_errors_total{request="Fetch,
error=NONE",} 16927.0
 kafka_network_requestmetrics_errors_total{request="Heartbeat,
error=NONE",} 18353.0
kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch,
error=UNKNOWN_TOPIC_OR_PARTITION",} 24.0
kafka_network_requestmetrics_errors_total{request="OffsetForLeaderEpoch,
error=NOT_LEADER_OR_FOLLOWER",} 17.0
 kafka_network_requestmetrics_errors_total{request="Produce,
error=NONE",} 4450.0
# HELP jmx_scrape_error Non-zero if this scrape failed.

These are the kafka configurations which i used

rm -f /var/lib/kafka/kafka-0/.lock;
rm -f /var/lib/kafka/kafka-0/meta.properties;
exec kafka-server-start.sh /opt/kafka/config/server.properties
--override unclean.leader.election.enable=true
--override broker.id=0
--override listeners=PLAINTEXT://\${LOCAL_POD_IP}:9093
--override host.name=#[[${HOSTNAME}]]#
--override advertised.listeners=PLAINTEXT://\${LOCAL_POD_IP}:9093
--override log.dirs=/var/lib/kafka/kafka-0
--override auto.create.topics.enable=true
--override auto.leader.rebalance.enable=true
--override compression.type=producer
--override delete.topic.enable=false
--override offsets.topic.replication.factor=2
--override broker.id.generation.enable=true
--override default.replication.factor=2
--override num.partitions=10
--override log.retention.bytes=536870912000
--override socket.request.max.bytes=1195725856
--override log.retention.hours=360
--override log.roll.hours=360
--override max.message.bytes=5242880
--override zookeeper.ssl.endpoint.identification.algorithm
--override zookeeper.ssl.client.enable=true
--override
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxn

Reg : kafka state store HA

2019-11-17 Thread Penumarthi Durga Prasad Chowdary
Hi ,
   Working on kafka-streams to process data & stored into
state-store(stored into local file system & changelog topic as well).Two
streams instances are started for HA & performance improvement ,then
streams are re-balanced the topic partition to process. Here we are facing
issue with state-store data , here two stream instances have their own
individual state-stores and those are not in sync(local file system), but
in sync with changelog topic . So *when querying on state-store data, it's
not giving results*.
I read this article
https://tech.transferwise.com/achieving-high-availability-with-kafka-streams/
,
added  num.standby.replicas  as 1 & 2 , but it's couldn't resolve the issue.
Is there a way to overcome this issue?

Thanks & Regards
Durga Prasad


-- 


Thank's&Regard's,
Prasad,
91-9030546248.