[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611393#comment-16611393 ]
Jason Gustafson commented on KAFKA-7403: ---------------------------------------- [~jonlee2] Thanks for the report. Would you mind changing the title to avoid confusion since KIP-211 was not actually in 2.0? > Offset commit failure after broker upgrade to 2.0 > ------------------------------------------------- > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jon Lee > Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506) > ~[kafka_2.11-2.0.0.10.jar:?] > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > ~[kafka_2.11-2.0.0.10.jar:?] > at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:193) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:505) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:484) > ~[kafka_2.11-2.0.0.10.jar:?] > at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:359) > ~[kafka_2.11-2.0.0.10.jar:?] > at kafka.server.KafkaApis.handle(KafkaApis.scala:114) > ~[kafka_2.11-2.0.0.10.jar:?] > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > ~[kafka_2.11-2.0.0.10.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > {code} > > And I was able to reproduce the error by passing KAFKA_0_11_0_IV2 as the > ApiVersion (the second parameter) to the constructor of GroupMetadataManager > in GroupMetadataManagerTest.scala. > > [~vahid], the error was from the code added for KAFKA-4682. Can you take a > look if this is indeed an issue? -- This message was sent by Atlassian JIRA (v7.6.3#76005)