[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632659#comment-16632659 ] Vahid Hashemian commented on KAFKA-7403: [~jonlee2] Thanks a lot for reporting it. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jon Lee >Priority: Blocker > Fix For: 2.1.0 > > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632074#comment-16632074 ] Jon Lee commented on KAFKA-7403: [~vahid] [~hachikuji] Thank you very much for the patch. One question: Why is the expiration timestamp always based on current timestamp now? Previously, if there was custom commit timestamp provided, it was used to compute the expiration timestamp, instead of current timestamp. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jon Lee >Priority: Blocker > Fix For: 2.1.0 > > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627571#comment-16627571 ] ASF GitHub Bot commented on KAFKA-7403: --- hachikuji closed pull request #5690: KAFKA-7403: Follow-up fix for KAFKA-4682 (KIP-211) to correct some edge cases URL: https://github.com/apache/kafka/pull/5690 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 8cf99fcc438..dba8b4e1810 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse} +import org.apache.kafka.common.requests.{IsolationLevel, OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} @@ -1129,7 +1129,7 @@ object GroupMetadataManager { value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata) value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp) // version 1 has a non empty expireTimestamp field - value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get) + value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)) } val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index afbe5b88204..10119c8d632 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -332,11 +332,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // for version 1 and beyond store offsets in offset manager -// commit timestamp is always set to now. // "default" expiration timestamp is now + retention (and retention may be overridden if v2) // expire timestamp is computed differently for v1 and v2. // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. -// - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp +// - If v1 and explicit retention time is provided we calculate expiration timestamp based on that // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect val currentTimestamp = time.milliseconds This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jon Lee >Priority: Blocker > Fix For: 2.1.0 > > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at >
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626793#comment-16626793 ] ASF GitHub Bot commented on KAFKA-7403: --- vahidhashemian opened a new pull request #5690: KAFKA-7403: Follow-up fix for KAFKA-4682 (KIP-211) to correct some edge cases URL: https://github.com/apache/kafka/pull/5690 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jon Lee >Priority: Blocker > Fix For: 2.1.0 > > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) >
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626774#comment-16626774 ] Vahid Hashemian commented on KAFKA-7403: [~hachikuji] Thanks for the pointer. I will try to submit a PR with that fix, but please feel free to take over (since you proposed the solution). > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jon Lee >Priority: Blocker > Fix For: 2.1.0 > > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626228#comment-16626228 ] Jason Gustafson commented on KAFKA-7403: [~vahid] I was looking at this as part of KAFKA-7437. I think when setting the expiration timestamp for v1, we can use `getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)`. The code from old versions appears to already handle this value correctly by using the commit timestamp plus the retention time when loading the cache. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jon Lee >Priority: Blocker > Fix For: 2.1.0 > > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) >
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622697#comment-16622697 ] Vahid Hashemian commented on KAFKA-7403: [~jonlee2] Thanks for the additional info. I'll try to take a look over the weekend. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622407#comment-16622407 ] Jon Lee commented on KAFKA-7403: [~vahid] Were you able to reproduce the issue with inter.broker.protocol.version set to < 2.1? I think KafkaApis.handleOffsetCommitRequest() should set expireTimestamp to None only if config.interBrokerProtocolVersion >= KAFKA_2_1_IV0 and offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTON_TIME. Then, GroupMetadataManager.offsetCommitValue() can use V1 if expireTimestmp != None, V2 otherwise. Also, questions about KafkaApis.handleOffsetCommitRequest(). The comments for commitTimestamp and expireTimestamp don't match with the code. For example, {code:java} // commit timestamp is always set to now. {code} but {code:java} commitTimestamp = partitionData.timestamp match { case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp case customTimestamp => customTimestamp },{code} Also, {code:java} // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp {code} but {code:java} expireTimestamp = offsetCommitRequest.retentionTime match { case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None case retentionTime => Some(currentTimestamp + retentionTime) }{code} > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347)
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618550#comment-16618550 ] Jon Lee commented on KAFKA-7403: [~vahid] Below code is from GroupMetadataManager.offsetCommitValue() and it looks to me that the if condition "apiVersion < KAFKA_2_1_IV0" is problematic. When inter.broker.protocol.version is set < 2.1, it will always pick V1 regardless of the existence of expireTimestamp, which may cause the error mentioned above. I think we should get rid of this condition and determine the version solely based on "offsetAndMetadata.expireTimestamp.nonEmpty". What do you think? {code:java} val (version, value) = { if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) // if an older version of the API is used, or if an explicit expiration is provided, use the older schema (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)) else (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)) } {code} > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) >
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618458#comment-16618458 ] Jon Lee commented on KAFKA-7403: [~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16616374#comment-16616374 ] Vahid Hashemian commented on KAFKA-7403: [~jonlee2] could you please share more on how you reproduce the issue? When I run a v0.10.2 consumer (with the default {{consumer.properties}}) against a v2 broker everything seems to be running smoothly. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612461#comment-16612461 ] Jon Lee commented on KAFKA-7403: [~hachikuji] Thanks for the suggestion. Updated the title and also affected version. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at