[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632074#comment-16632074 ] Jon Lee commented on KAFKA-7403: [~vahid] [~hachikuji] Thank you very much for the patch. One question: Why is the expiration timestamp always based on current timestamp now? Previously, if there was custom commit timestamp provided, it was used to compute the expiration timestamp, instead of current timestamp. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jon Lee >Priority: Blocker > Fix For: 2.1.0 > > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622407#comment-16622407 ] Jon Lee commented on KAFKA-7403: [~vahid] Were you able to reproduce the issue with inter.broker.protocol.version set to < 2.1? I think KafkaApis.handleOffsetCommitRequest() should set expireTimestamp to None only if config.interBrokerProtocolVersion >= KAFKA_2_1_IV0 and offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTON_TIME. Then, GroupMetadataManager.offsetCommitValue() can use V1 if expireTimestmp != None, V2 otherwise. Also, questions about KafkaApis.handleOffsetCommitRequest(). The comments for commitTimestamp and expireTimestamp don't match with the code. For example, {code:java} // commit timestamp is always set to now. {code} but {code:java} commitTimestamp = partitionData.timestamp match { case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp case customTimestamp => customTimestamp },{code} Also, {code:java} // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp {code} but {code:java} expireTimestamp = offsetCommitRequest.retentionTime match { case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None case retentionTime => Some(currentTimestamp + retentionTime) }{code} > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347)
[jira] [Issue Comment Deleted] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee updated KAFKA-7403: --- Comment: was deleted (was: [~vahid] Below code is from GroupMetadataManager.offsetCommitValue() and it looks to me that the if condition "apiVersion < KAFKA_2_1_IV0" is problematic. When inter.broker.protocol.version is set < 2.1, it will always pick V1 regardless of the existence of expireTimestamp, which may cause the error mentioned above. I think we should get rid of this condition and determine the version solely based on "offsetAndMetadata.expireTimestamp.nonEmpty". What do you think? {code:java} val (version, value) = { if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) // if an older version of the API is used, or if an explicit expiration is provided, use the older schema (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)) else (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)) } {code} ) > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?]
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618550#comment-16618550 ] Jon Lee commented on KAFKA-7403: [~vahid] Below code is from GroupMetadataManager.offsetCommitValue() and it looks to me that the if condition "apiVersion < KAFKA_2_1_IV0" is problematic. When inter.broker.protocol.version is set < 2.1, it will always pick V1 regardless of the existence of expireTimestamp, which may cause the error mentioned above. I think we should get rid of this condition and determine the version solely based on "offsetAndMetadata.expireTimestamp.nonEmpty". What do you think? {code:java} val (version, value) = { if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) // if an older version of the API is used, or if an explicit expiration is provided, use the older schema (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)) else (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)) } {code} > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) >
[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618458#comment-16618458 ] Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:41 AM: - [~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 for your brokers and commit an offset from a consumer? BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in unit tests, pass KAFKA_0_11_0_IV2 instead of ApiVersion.latestVersion as the second parameter of the GroupMetadataManager constructor in GroupMetadataManagerTest.scala. Then, multiple tests including testCommitOffset fail. was (Author: jonlee2): [~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 for your brokers and commit an offset from a consumer? BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at >
[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618458#comment-16618458 ] Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:32 AM: - [~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 for your brokers and commit an offset from a consumer? BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. was (Author: jonlee2): [~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at >
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618458#comment-16618458 ] Jon Lee commented on KAFKA-7403: [~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612461#comment-16612461 ] Jon Lee commented on KAFKA-7403: [~hachikuji] Thanks for the suggestion. Updated the title and also affected version. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee updated KAFKA-7403: --- Summary: Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 (was: Offset commit failure after broker upgrade to 2.0) > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at
[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee updated KAFKA-7403: --- Affects Version/s: 2.1.0 > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) > ~[kafka_2.11-2.0.0.10.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[scala-library-2.11.12.jar:?] > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[scala-library-2.11.12.jar:?] > at >
[jira] [Updated] (KAFKA-7403) Offset commit failure after broker upgrade to 2.0
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee updated KAFKA-7403: --- Description: I am currently trying broker upgrade from 0.11 to 2.0 with some patches including KIP-211/KAFKA-4682. After the upgrade, however, applications with 0.10.2 Kafka clients failed with the following error: {code:java} 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. org.apache.kafka.common.KafkaException: Unexpected error in commit: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) ~[kafka-clients-0.10.2.86.jar:?] {code} >From my reading of the code, it looks like the following happened: # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. # In the 2.0 broker code, upon receiving an OffsetCommitRequest with DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the "expireTimestamp" field of OffsetAndMetadata to None. # Later in the code path, GroupMetadataManager.offsetCommitValue() expects OffsetAndMetadata to have a non-empty "expireTimestamp" field if the inter.broker.protocol.version is < KAFKA_2_1_IV0. # However, the inter.broker.protocol.version was set to "1.0" prior to the upgrade, and as a result, the following code in offsetCommitValue() raises an error because expireTimestamp is None: {code:java} value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get){code} Here is the stack trace for the broker side error {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] at kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) ~[kafka_2.11-2.0.0.10.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) ~[scala-library-2.11.12.jar:?] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.12.jar:?] at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
[jira] [Created] (KAFKA-7403) Offset commit failure after broker upgrade to 2.0
Jon Lee created KAFKA-7403: -- Summary: Offset commit failure after broker upgrade to 2.0 Key: KAFKA-7403 URL: https://issues.apache.org/jira/browse/KAFKA-7403 Project: Kafka Issue Type: Bug Components: core Reporter: Jon Lee I am currently trying broker upgrade from 0.11 to 2.0 with some patches including KIP-211/KAFKA-4682. After the upgrade, however, applications with 0.10.2 Kafka clients failed with the following error: {code:java} 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. org.apache.kafka.common.KafkaException: Unexpected error in commit: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) ~[kafka-clients-0.10.2.86.jar:?] {code} >From my reading of the code, it looks like the following happened: # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. # In the 2.0 broker code, upon receiving an OffsetCommitRequest with DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the "expireTimestamp" field of OffsetAndMetadata to None. # Later in the code path, GroupMetadataManager.offsetCommitValue() expects OffsetAndMetadata to have a non-empty "expireTimestamp" field if the inter.broker.protocol.version is < KAFKA_2_1_IV0. # However, the inter.broker.protocol.version was set to "1.0" prior to the upgrade, and as a result, the following code in offsetCommitValue() raises an error because expireTimestamp is None: {code:java} value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get){code} Here is the stack trace for the broker side error {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] at kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) ~[kafka_2.11-2.0.0.10.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) ~[scala-library-2.11.12.jar:?] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.12.jar:?] at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521) ~[kafka_2.11-2.0.0.10.jar:?] at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506) ~[kafka_2.11-2.0.0.10.jar:?] at
[jira] [Assigned] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled
[ https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee reassigned KAFKA-6946: -- Assignee: Jon Lee > Keep the session id for incremental fetch when fetch responses are throttled > - > > Key: KAFKA-6946 > URL: https://issues.apache.org/jira/browse/KAFKA-6946 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.0.0 >Reporter: Jon Lee >Assignee: Jon Lee >Priority: Major > > The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with > INVALID_SESSION_ID if the response needs to be throttled due to quota > violation. If it is for incremental fetch, this will make the client reset > its session and send a full fetch request next time. This is not a > correctness issue, but it may affect performance when fetches are throttled. > In case of incremental fetch, a throttled response should use the same > session id as before so that the next unthrottled response can be in the same > session. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers
[ https://issues.apache.org/jira/browse/KAFKA-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee reassigned KAFKA-6944: -- Assignee: Jon Lee > Add system tests testing the new throttling behavior using older > clients/brokers > > > Key: KAFKA-6944 > URL: https://issues.apache.org/jira/browse/KAFKA-6944 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 2.0.0 >Reporter: Jon Lee >Assignee: Jon Lee >Priority: Major > > KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as > follows: > * the broker will send out a response with throttle time to the client > immediately and mute the channel > * upon receiving a response with a non-zero throttle time, the client will > also block sending further requests to the broker until the throttle time is > over. > The current system tests assume that both clients and brokers are of the same > version. We'll need an additional set of quota tests that test throttling > behavior between older clients and newer brokers and between newer clients > and older brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6028) Improve the quota throttle communication.
[ https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee reassigned KAFKA-6028: -- Assignee: Jon Lee (was: Jiangjie Qin) > Improve the quota throttle communication. > - > > Key: KAFKA-6028 > URL: https://issues.apache.org/jira/browse/KAFKA-6028 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Assignee: Jon Lee >Priority: Major > Fix For: 2.0.0 > > > Currently if a client is throttled duet to quota violation, the broker will > only send back a response to the clients after the throttle time has passed. > In this case, the clients don't know how long the response will be throttled > and might hit request timeout before the response is returned. As a result > the clients will retry sending a request and results a even longer throttle > time. > The above scenario could happen when a large clients group sending records to > the brokers. We saw this when a MapReduce job pushes data to the Kafka > cluster. > To improve this, the broker can return the response with throttle time > immediately after processing the requests. After that, the broker will mute > the channel for this client. A correct client implementation should back off > for that long before sending the next request. If the client ignored the > throttle time and send the next request immediately, the channel will be > muted and the request won't be processed until the throttle time has passed. > A KIP will follow with more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created
[ https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee reassigned KAFKA-7126: -- Assignee: Jon Lee (was: Dong Lin) > Reduce number of rebalance period for large consumer groups after a topic is > created > > > Key: KAFKA-7126 > URL: https://issues.apache.org/jira/browse/KAFKA-7126 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Jon Lee >Priority: Major > Attachments: 1.diff > > > For a group of 200 MirrorMaker consumers with patten-based topic > subscription, a single topic creation caused 50 rebalances for each of these > consumer over 5 minutes period. This causes the MM to significantly lag > behind during this 5 minutes period and the clusters may be considerably > out-of-sync during this period. > Ideally we would like to trigger only 1 rebalance in the MM group after a > topic is created. And conceptually it should be doable. > > Here is the explanation of this repeated consumer rebalance based on the > consumer rebalance logic in the latest Kafka code: > 1) A topic of 10 partitions are created in the cluster and it matches the > subscription pattern of the MM consumers. > 2) The leader of the MM consumer group detects the new topic after metadata > refresh. It triggers rebalance. > 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 > partition of this topic. The other 190 consumers are not assigned any > partition of this topic. At this moment, the newly created topic will appear > in `ConsumerCoordinator.subscriptions.subscription` for those consumers who > is assigned partition of this consumer or who has refreshed metadata before > time T0. > 4) In the common case, half of the consumers has refreshed metadata before > the leader of the consumer group refreshed metadata. Thus around 100 + 10 = > 110 consumers has the newly created topic in > `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do > not have this topic in `ConsumerCoordinator.subscriptions.subscription`. > 5) For those 90 consumers, if any consumer refreshes metadata, it will add > this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes > `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers > another rebalance. If a few consumers refresh metadata almost at the same > time, they will jointly trigger one rebalance. Otherwise, they each trigger a > separate rebalance. > 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, > which is probably also the average case if number of consumers in the group > is large, the latest consumer will refresh its metadata 5 minutes after T0. > And the rebalance will be repeated during this 5 minutes interval. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7177) Update 2.0 documentation to reflect changed quota behaviors by KIP-219
Jon Lee created KAFKA-7177: -- Summary: Update 2.0 documentation to reflect changed quota behaviors by KIP-219 Key: KAFKA-7177 URL: https://issues.apache.org/jira/browse/KAFKA-7177 Project: Kafka Issue Type: Task Components: documentation Reporter: Jon Lee KIP-219 changed the way quota violation is communicated between clients and brokers. Documentation should be updated accordingly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers
[ https://issues.apache.org/jira/browse/KAFKA-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee resolved KAFKA-6944. Resolution: Fixed > Add system tests testing the new throttling behavior using older > clients/brokers > > > Key: KAFKA-6944 > URL: https://issues.apache.org/jira/browse/KAFKA-6944 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 2.0.0 >Reporter: Jon Lee >Priority: Major > > KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as > follows: > * the broker will send out a response with throttle time to the client > immediately and mute the channel > * upon receiving a response with a non-zero throttle time, the client will > also block sending further requests to the broker until the throttle time is > over. > The current system tests assume that both clients and brokers are of the same > version. We'll need an additional set of quota tests that test throttling > behavior between older clients and newer brokers and between newer clients > and older brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled
[ https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Lee updated KAFKA-6946: --- Summary: Keep the session id for incremental fetch when fetch responses are throttled (was: Keep the fetch session id for incremental fetch when ) > Keep the session id for incremental fetch when fetch responses are throttled > - > > Key: KAFKA-6946 > URL: https://issues.apache.org/jira/browse/KAFKA-6946 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.0.0 >Reporter: Jon Lee >Priority: Major > > The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with > INVALID_SESSION_ID if the response needs to be throttled due to quota > violation. If it is for incremental fetch, this will make the client reset > its session and send a full fetch request next time. This is not a > correctness issue, but it may affect performance when fetches are throttled. > In case of incremental fetch, a throttled response should use the same > session id as before so that the next unthrottled response can be in the same > session. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6946) Keep the fetch session id for incremental fetch when
Jon Lee created KAFKA-6946: -- Summary: Keep the fetch session id for incremental fetch when Key: KAFKA-6946 URL: https://issues.apache.org/jira/browse/KAFKA-6946 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.0.0 Reporter: Jon Lee The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with INVALID_SESSION_ID if the response needs to be throttled due to quota violation. If it is for incremental fetch, this will make the client reset its session and send a full fetch request next time. This is not a correctness issue, but it may affect performance when fetches are throttled. In case of incremental fetch, a throttled response should use the same session id as before so that the next unthrottled response can be in the same session. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers
Jon Lee created KAFKA-6944: -- Summary: Add system tests testing the new throttling behavior using older clients/brokers Key: KAFKA-6944 URL: https://issues.apache.org/jira/browse/KAFKA-6944 Project: Kafka Issue Type: Test Components: system tests Affects Versions: 2.0.0 Reporter: Jon Lee KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as follows: * the broker will send out a response with throttle time to the client immediately and mute the channel * upon receiving a response with a non-zero throttle time, the client will also block sending further requests to the broker until the throttle time is over. The current system tests assume that both clients and brokers are of the same version. We'll need an additional set of quota tests that test throttling behavior between older clients and newer brokers and between newer clients and older brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)