[ https://issues.apache.org/jira/browse/KAFKA-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635723#comment-17635723 ]
Suresh Rukmangathan commented on KAFKA-14184: --------------------------------------------- Few updates:- # We are using ProcessorSupplier & it will return a "new instance" every time. So, the problem of same instance is not applicable. # After moving to Kafka Streams version: 3.2.2, we are not seeing this exception any longer. There were no other application side properties or design changes. It has been stable for about a week of continuous testing. So, it might a good idea to correlate to any fix that went around standby code in this context and tag this problem to the relevant release notes as the fix - which can help others like me. I lost quite some time chasing this issue and had to keep the app instance to 1 to avoid this exception - which meant I had to stall the scale testing as well. > Kafka streams application crashes due to "UnsupportedOperationException: this > should not happen: timestamp() is not supported in standby tasks." > ------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-14184 > URL: https://issues.apache.org/jira/browse/KAFKA-14184 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0 > Reporter: Suresh Rukmangathan > Priority: Critical > > Kafka streams application is crashing with following stack trace with 3 > frames from the app removed that are process/state-store related functions. > > {code:java} > java.lang.UnsupportedOperationException: this should not happen: timestamp() > is not supported in standby tasks.\n\n\tat > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat > > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat > // app-calls to process & save to state store - 3 frames > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n" > {code} > > Key Kafka streams application configuration details are as below:- > {code:java} > {replication.factor=1, num.standby.replicas=1, topology.optimization=all, > max.request.size=1048576, auto.offset.reset=earliest}{code} > > If Kafka streams replication factor = 1 and standby replicas=1, is that an > issue? Do we expect that the replication factor should be at least n+1, if > standby replicas=1 (or) there is no relationship? > > Couple of more data points are:- > # Crash stopped once I made the standby replicas to 0. > # Crash also stopped once I made the number of instances (only one pod - one > pod has only one instance of the application running) > > So, is there something that is not correct in the way we have configured the > Kafka Streams application and/or Kafka is not handling the standby task > correctly in case of state store replica/standby handling (like calling the > put from the standby context, when it is not supposed to)? > We don't want to loose out on standby replica, as it will help faster > recovery of consumers - hence setting standby=0 is like not using that > feature - and that is not an option. > Is there a way to debug this better by enabling some logs from Kafka (like > set some env var to add more debug logs) to isolate the issue? Not sure if > this issue specific to 2.7.0 and things are resolved in later releases. -- This message was sent by Atlassian Jira (v8.20.10#820010)