[ https://issues.apache.org/jira/browse/KAFKA-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen updated KAFKA-9666: ------------------------------- Description: As of today, the producer epoch keeps increasing until it hits Short.Max. The correct behavior at this point should be making another call to re-initialize a new PID, otherwise trying with Short.Max will throw fatal exception which eventually kills the producer. Stream log: [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) [2020-03-05 04:25:41,147] ERROR [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Thread StreamsThread threadId: stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3 [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) TaskManager MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions = [], controller = null) Active tasks: Running: Running Partitions: New: Restoring: Restoring Partitions: Restored Partitions: Suspended: Standby tasks: Running: Running Partitions: New: encountered an error processing soak test (org.apache.kafka.streams.StreamsSoakTest) [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) org.apache.kafka.streams.errors.StreamsException: stream-thread [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Failed to rebalance. at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:749) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The server experienced an unexpected error when processing the request. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:571) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.lang.Thread.run(Thread.java:748) Producer log: [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_server-log) [2020-03-05 04:25:40,885] INFO [Transaction State Manager 1001]: TransactionalId stream-soak-test-1_0 append transaction log for TxnTransitMetadata(producerId=0, producerEpoch=576, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0, stream-soak-test-logData10MinuteFinalCount-store-changelog-0, stream-soak-test-logData10MinuteSuppressedCount-store-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-0, stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000025-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-0, windowed-node-counts-0), txnStartTimestamp=1583382340885, txnLastUpdateTimestamp=1583382340885) transition failed due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Ongoing), aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the callback (kafka.coordinator.transaction.TransactionStateManager) [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_stdout) java.lang.IllegalStateException: Cannot fence producer with epoch equal to Short.MaxValue since this would overflow at kafka.coordinator.transaction.TransactionMetadata.prepareFenceProducerEpoch(TransactionMetadata.scala:194) at kafka.coordinator.transaction.TransactionCoordinator.kafka$coordinator$transaction$TransactionCoordinator$$prepareInitProduceIdTransit(TransactionCoordinator.scala:216) at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143) at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:142) at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:138) at scala.util.Either$RightProjection.flatMap(Either.scala:522) at kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:137) at kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:1638) at kafka.server.KafkaApis.handle(KafkaApis.scala:135) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) was:As of today, the producer epoch keeps increasing until it hits Short.Max. The correct behavior at this point should be making another call to re-initialize a new PID, otherwise trying with Short.Max will throw fatal exception which eventually kills the producer. > Transactional producer Epoch could not be reset > ----------------------------------------------- > > Key: KAFKA-9666 > URL: https://issues.apache.org/jira/browse/KAFKA-9666 > Project: Kafka > Issue Type: Bug > Reporter: Boyang Chen > Priority: Major > > As of today, the producer epoch keeps increasing until it hits Short.Max. The > correct behavior at this point should be making another call to re-initialize > a new PID, otherwise trying with Short.Max will throw fatal exception which > eventually kills the producer. > Stream log: > [2020-03-04T20:25:41-08:00] > (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) [2020-03-05 > 04:25:41,147] ERROR > [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Thread > StreamsThread threadId: > stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3 > [2020-03-04T20:25:41-08:00] > (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) TaskManager > MetadataState: > GlobalMetadata: [] > GlobalStores: [] > My HostInfo: HostInfo\{host='unknown', port=-1} > Cluster(id = null, nodes = [], partitions = [], controller = > null) > Active tasks: > Running: > Running Partitions: > New: > Restoring: > Restoring Partitions: > Restored Partitions: > Suspended: > Standby tasks: > Running: > Running Partitions: > New: > encountered an error processing soak test > (org.apache.kafka.streams.StreamsSoakTest) > [2020-03-04T20:25:41-08:00] > (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) > org.apache.kafka.streams.errors.StreamsException: stream-thread > [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Failed > to rebalance. > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:749) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [2020-03-04T20:25:41-08:00] > (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) Caused by: > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:571) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) > at java.lang.Thread.run(Thread.java:748) > > > Producer log: > [2020-03-04T20:25:41-08:00] > (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_server-log) [2020-03-05 > 04:25:40,885] INFO [Transaction State Manager 1001]: TransactionalId > stream-soak-test-1_0 append transaction log for > TxnTransitMetadata(producerId=0, producerEpoch=576, txnTimeoutMs=60000, > txnState=Ongoing, > topicPartitions=Set(stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0, > stream-soak-test-logData10MinuteFinalCount-store-changelog-0, > stream-soak-test-logData10MinuteSuppressedCount-store-changelog-0, > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-0, > stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog-0, > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000025-changelog-0, > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-0, > windowed-node-counts-0), txnStartTimestamp=1583382340885, > txnLastUpdateTimestamp=1583382340885) transition failed due to > COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Ongoing), > aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the > callback (kafka.coordinator.transaction.TransactionStateManager) > [2020-03-04T20:25:41-08:00] > (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_stdout) > java.lang.IllegalStateException: Cannot fence producer with epoch equal to > Short.MaxValue since this would overflow > at > kafka.coordinator.transaction.TransactionMetadata.prepareFenceProducerEpoch(TransactionMetadata.scala:194) > at > kafka.coordinator.transaction.TransactionCoordinator.kafka$coordinator$transaction$TransactionCoordinator$$prepareInitProduceIdTransit(TransactionCoordinator.scala:216) > at > kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143) > at > kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at > kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:142) > at > kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:138) > at scala.util.Either$RightProjection.flatMap(Either.scala:522) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:137) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:1638) > at kafka.server.KafkaApis.handle(KafkaApis.scala:135) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.3.4#803005)