[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-28 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632074#comment-16632074
 ] 

Jon Lee commented on KAFKA-7403:


[~vahid] [~hachikuji] Thank you very much for the patch. 

One question: Why is the expiration timestamp always based on current timestamp 
now? Previously, if there was custom commit timestamp provided, it was used to 
compute the expiration timestamp, instead of current timestamp. 

 

 

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Blocker
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-20 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622407#comment-16622407
 ] 

Jon Lee commented on KAFKA-7403:


[~vahid] Were you able to reproduce the issue with 
inter.broker.protocol.version set to < 2.1? 

 

I think KafkaApis.handleOffsetCommitRequest() should set expireTimestamp to 
None only if config.interBrokerProtocolVersion >= KAFKA_2_1_IV0 and 
offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTON_TIME. 
Then, GroupMetadataManager.offsetCommitValue() can use V1 if expireTimestmp != 
None, V2 otherwise.

 

Also, questions about KafkaApis.handleOffsetCommitRequest(). The comments for 
commitTimestamp and expireTimestamp don't match with the code.

For example,
{code:java}
// commit timestamp is always set to now.
{code}
but
{code:java}
commitTimestamp = partitionData.timestamp match {
  case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
  case customTimestamp => customTimestamp
},{code}
 

Also,
{code:java}
// - If v1 and explicit commit timestamp is provided we calculate retention 
from that explicit commit timestamp
{code}
but
{code:java}
expireTimestamp = offsetCommitRequest.retentionTime match {
  case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
  case retentionTime => Some(currentTimestamp + retentionTime)
}{code}

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) 

[jira] [Issue Comment Deleted] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-18 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-7403:
---
Comment: was deleted

(was: [~vahid] Below code is from GroupMetadataManager.offsetCommitValue() and 
it looks to me that the if condition "apiVersion < KAFKA_2_1_IV0" is 
problematic. When inter.broker.protocol.version is set < 2.1, it will always 
pick V1 regardless of the existence of expireTimestamp, which may cause the 
error mentioned above.

I think we should get rid of this condition and determine the version solely 
based on "offsetAndMetadata.expireTimestamp.nonEmpty". What do you think? 

 
{code:java}
val (version, value) = {
  if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
  // if an older version of the API is used, or if an explicit expiration is 
provided, use the older schema
(1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
  else
(2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
}
{code}
 )

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]

[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-18 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618550#comment-16618550
 ] 

Jon Lee commented on KAFKA-7403:


[~vahid] Below code is from GroupMetadataManager.offsetCommitValue() and it 
looks to me that the if condition "apiVersion < KAFKA_2_1_IV0" is problematic. 
When inter.broker.protocol.version is set < 2.1, it will always pick V1 
regardless of the existence of expireTimestamp, which may cause the error 
mentioned above.

I think we should get rid of this condition and determine the version solely 
based on "offsetAndMetadata.expireTimestamp.nonEmpty". What do you think? 

 
{code:java}
val (version, value) = {
  if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
  // if an older version of the API is used, or if an explicit expiration is 
provided, use the older schema
(1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
  else
(2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
}
{code}
 

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  

[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-17 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618458#comment-16618458
 ] 

Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:41 AM:
-

[~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 
for your brokers and commit an offset from a consumer? 

BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in unit tests, pass KAFKA_0_11_0_IV2 
instead of ApiVersion.latestVersion as the second parameter of the 
GroupMetadataManager constructor in GroupMetadataManagerTest.scala. Then, 
multiple tests including testCommitOffset fail.


was (Author: jonlee2):
[~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 
for your brokers and commit an offset from a consumer? 

BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> 

[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-17 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618458#comment-16618458
 ] 

Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:32 AM:
-

[~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 
for your brokers and commit an offset from a consumer? 

BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.


was (Author: jonlee2):
[~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> 

[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-17 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618458#comment-16618458
 ] 

Jon Lee commented on KAFKA-7403:


[~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because 

[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-12 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612461#comment-16612461
 ] 

Jon Lee commented on KAFKA-7403:


[~hachikuji] Thanks for the suggestion. Updated the title and also affected 
version.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-12 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-7403:
---
Summary: Offset commit failure after upgrading brokers past 
KIP-211/KAFKA-4682  (was: Offset commit failure after broker upgrade to 2.0)

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-12 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-7403:
---
Affects Version/s: 2.1.0

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[scala-library-2.11.12.jar:?]
> at 
> 

[jira] [Updated] (KAFKA-7403) Offset commit failure after broker upgrade to 2.0

2018-09-11 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-7403:
---
Description: 
I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
0.10.2 Kafka clients failed with the following error:
{code:java}
2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
org.apache.kafka.common.KafkaException: Unexpected error in commit: The server 
experienced an unexpected error when processing the request at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
 ~[kafka-clients-0.10.2.86.jar:?]
{code}
>From my reading of the code, it looks like the following happened:
 # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets the 
retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
 # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
"expireTimestamp" field of OffsetAndMetadata to None.
 # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
inter.broker.protocol.version is < KAFKA_2_1_IV0.
 # However, the inter.broker.protocol.version was set to "1.0" prior to the 
upgrade, and as a result, the following code in offsetCommitValue() raises an 
error because expireTimestamp is None:
{code:java}
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
offsetAndMetadata.expireTimestamp.get){code}

 

Here is the stack trace for the broker side error
{code:java}
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
~[scala-library-2.11.12.jar:?]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[scala-library-2.11.12.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
 ~[kafka_2.11-2.0.0.10.jar:?]
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 

[jira] [Created] (KAFKA-7403) Offset commit failure after broker upgrade to 2.0

2018-09-11 Thread Jon Lee (JIRA)
Jon Lee created KAFKA-7403:
--

 Summary: Offset commit failure after broker upgrade to 2.0
 Key: KAFKA-7403
 URL: https://issues.apache.org/jira/browse/KAFKA-7403
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jon Lee


I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
0.10.2 Kafka clients failed with the following error:
{code:java}
2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
org.apache.kafka.common.KafkaException: Unexpected error in commit: The server 
experienced an unexpected error when processing the request at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
 ~[kafka-clients-0.10.2.86.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
 ~[kafka-clients-0.10.2.86.jar:?]
{code}
>From my reading of the code, it looks like the following happened:
 # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets the 
retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
 # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
"expireTimestamp" field of OffsetAndMetadata to None.
 # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
inter.broker.protocol.version is < KAFKA_2_1_IV0.
 # However, the inter.broker.protocol.version was set to "1.0" prior to the 
upgrade, and as a result, the following code in offsetCommitValue() raises an 
error because expireTimestamp is None:
{code:java}
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
offsetAndMetadata.expireTimestamp.get){code}

 

Here is the stack trace for the broker side error
{code:java}
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
~[scala-library-2.11.12.jar:?]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[scala-library-2.11.12.jar:?]
at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
 ~[kafka_2.11-2.0.0.10.jar:?]
at 

[jira] [Assigned] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-6946:
--

Assignee: Jon Lee

> Keep the session id for incremental fetch when fetch responses are throttled 
> -
>
> Key: KAFKA-6946
> URL: https://issues.apache.org/jira/browse/KAFKA-6946
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Major
>
> The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
> INVALID_SESSION_ID if the response needs to be throttled due to quota 
> violation. If it is for incremental fetch, this will make the client reset 
> its session and send a full fetch request next time. This is not a 
> correctness issue, but it may affect performance when fetches are throttled.
> In case of incremental fetch, a throttled response should use the same 
> session id as before so that the next unthrottled response can be in the same 
> session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-6944:
--

Assignee: Jon Lee

> Add system tests testing the new throttling behavior using older 
> clients/brokers
> 
>
> Key: KAFKA-6944
> URL: https://issues.apache.org/jira/browse/KAFKA-6944
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Major
>
> KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as 
> follows:
>  * the broker will send out a response with throttle time to the client 
> immediately and mute the channel
>  * upon receiving a response with a non-zero throttle time, the client will 
> also block sending further requests to the broker until the throttle time is 
> over.
> The current system tests assume that both clients and brokers are of the same 
> version. We'll need an additional set of quota tests that test throttling 
> behavior between older clients and newer brokers and between newer clients 
> and older brokers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6028) Improve the quota throttle communication.

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-6028:
--

Assignee: Jon Lee  (was: Jiangjie Qin)

> Improve the quota throttle communication.
> -
>
> Key: KAFKA-6028
> URL: https://issues.apache.org/jira/browse/KAFKA-6028
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently if a client is throttled duet to quota violation, the broker will 
> only send back a response to the clients after the throttle time has passed. 
> In this case, the clients don't know how long the response will be throttled 
> and might hit request timeout before the response is returned. As a result 
> the clients will retry sending a request and results a even longer throttle 
> time.
> The above scenario could happen when a large clients group sending records to 
> the brokers. We saw this when a MapReduce job pushes data to the Kafka 
> cluster.
> To improve this, the broker can return the response with throttle time 
> immediately after processing the requests. After that, the broker will mute 
> the channel for this client. A correct client implementation should back off 
> for that long before sending the next request. If the client ignored the 
> throttle time and send the next request immediately, the channel will be 
> muted and the request won't be processed until the throttle time has passed.
> A KIP will follow with more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-7126:
--

Assignee: Jon Lee  (was: Dong Lin)

> Reduce number of rebalance period for large consumer groups after a topic is 
> created
> 
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7177) Update 2.0 documentation to reflect changed quota behaviors by KIP-219

2018-07-17 Thread Jon Lee (JIRA)
Jon Lee created KAFKA-7177:
--

 Summary: Update 2.0 documentation to reflect changed quota 
behaviors by KIP-219 
 Key: KAFKA-7177
 URL: https://issues.apache.org/jira/browse/KAFKA-7177
 Project: Kafka
  Issue Type: Task
  Components: documentation
Reporter: Jon Lee


KIP-219 changed the way quota violation is communicated between clients and 
brokers. Documentation should be updated accordingly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers

2018-06-28 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee resolved KAFKA-6944.

Resolution: Fixed

> Add system tests testing the new throttling behavior using older 
> clients/brokers
> 
>
> Key: KAFKA-6944
> URL: https://issues.apache.org/jira/browse/KAFKA-6944
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Priority: Major
>
> KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as 
> follows:
>  * the broker will send out a response with throttle time to the client 
> immediately and mute the channel
>  * upon receiving a response with a non-zero throttle time, the client will 
> also block sending further requests to the broker until the throttle time is 
> over.
> The current system tests assume that both clients and brokers are of the same 
> version. We'll need an additional set of quota tests that test throttling 
> behavior between older clients and newer brokers and between newer clients 
> and older brokers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-05-24 Thread Jon Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-6946:
---
Summary: Keep the session id for incremental fetch when fetch responses are 
throttled   (was: Keep the fetch session id for incremental fetch when )

> Keep the session id for incremental fetch when fetch responses are throttled 
> -
>
> Key: KAFKA-6946
> URL: https://issues.apache.org/jira/browse/KAFKA-6946
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Priority: Major
>
> The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
> INVALID_SESSION_ID if the response needs to be throttled due to quota 
> violation. If it is for incremental fetch, this will make the client reset 
> its session and send a full fetch request next time. This is not a 
> correctness issue, but it may affect performance when fetches are throttled.
> In case of incremental fetch, a throttled response should use the same 
> session id as before so that the next unthrottled response can be in the same 
> session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6946) Keep the fetch session id for incremental fetch when

2018-05-24 Thread Jon Lee (JIRA)
Jon Lee created KAFKA-6946:
--

 Summary: Keep the fetch session id for incremental fetch when 
 Key: KAFKA-6946
 URL: https://issues.apache.org/jira/browse/KAFKA-6946
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.0.0
Reporter: Jon Lee


The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
INVALID_SESSION_ID if the response needs to be throttled due to quota 
violation. If it is for incremental fetch, this will make the client reset its 
session and send a full fetch request next time. This is not a correctness 
issue, but it may affect performance when fetches are throttled.

In case of incremental fetch, a throttled response should use the same session 
id as before so that the next unthrottled response can be in the same session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers

2018-05-24 Thread Jon Lee (JIRA)
Jon Lee created KAFKA-6944:
--

 Summary: Add system tests testing the new throttling behavior 
using older clients/brokers
 Key: KAFKA-6944
 URL: https://issues.apache.org/jira/browse/KAFKA-6944
 Project: Kafka
  Issue Type: Test
  Components: system tests
Affects Versions: 2.0.0
Reporter: Jon Lee


KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as 
follows:
 * the broker will send out a response with throttle time to the client 
immediately and mute the channel
 * upon receiving a response with a non-zero throttle time, the client will 
also block sending further requests to the broker until the throttle time is 
over.

The current system tests assume that both clients and brokers are of the same 
version. We'll need an additional set of quota tests that test throttling 
behavior between older clients and newer brokers and between newer clients and 
older brokers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)