[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626793#comment-16626793
 ] 

ASF GitHub Bot commented on KAFKA-7403:
---------------------------------------

vahidhashemian opened a new pull request #5690: KAFKA-7403: Follow-up fix for 
KAFKA-4682 (KIP-211) to correct some edge cases
URL: https://github.com/apache/kafka/pull/5690
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7403
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7403
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jon Lee
>            Priority: Blocker
>             Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:193) 
> ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:505)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:484)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:359) 
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.server.KafkaApis.handle(KafkaApis.scala:114) 
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) 
> ~[kafka_2.11-2.0.0.10.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> {code}
>  
> And I was able to reproduce the error by passing KAFKA_0_11_0_IV2 as the 
> ApiVersion (the second parameter) to the constructor of GroupMetadataManager 
> in GroupMetadataManagerTest.scala.
>  
> [~vahid], the error was from the code added for KAFKA-4682. Can you take a 
> look if this is indeed an issue? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to