[GitHub] [kafka] guozhangwang opened a new pull request #8629: MINOR: Update nodesWithPendingFetchRequests in Fetcher before sending request

2020-05-06 Thread GitBox


guozhangwang opened a new pull request #8629:
URL: https://github.com/apache/kafka/pull/8629


   It is possible that:
   
   1. caller thread triggers `client.send()` is called with the request, and 
wakes up the client.
   2. heartbeat thread triggers `client.poll()` and gets the response back, not 
triggering the handler since it is not added yet.
   3. `this.nodesWithPendingFetchRequests.add(entry.getKey().id())` is called.
   
   In this case, `nodesWithPendingFetchRequests` would prevent future requests 
to send to this node (we would keep seeing `Skipping fetch for partition...`), 
but there's no response back to remove the entry, as a result the consumer 
could be blocked without progress.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9967) SASL PLAIN authentication with custom callback handler

2020-05-06 Thread indira (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

indira updated KAFKA-9967:
--
Description: 
I'm trying to add custom handler for SASL PLAN authentication. i have followed 
kafka document which says to add 
"listener.name.sasl_ssl.plain.sasl.server.callback.handler.class" with custom 
class name to server config.  but this custom class is never taken , its always 
going to default PlainServerCallbackHandler. 

On debuging the kafka-client code, observed that 
SaslChannelBuilder->createServerCallbackHandlers method is trying to read 
config property as "plain.sasl.server.callback.handler.class". which is 
different from the one mentioned in the doc.  i have changed the property name 
in my config file and tried, but still it did not work.

Below is part of the config 

sasl.server.callback.handler.class=null, confluent.ssl.keystore.location=null, 
adverti
sed.port=null, log.cleaner.dedupe.buffer.size=134217728, 
confluent.bearer.auth.token=null, confluent.tier.s3.aws.endpoint.override=null, 
log.cleaner.io.buffer.size=524288, create.topic.policy.class.name=null, 
confluent.missing.id.cache.ttl.sec=60, 
confluent.tier.fetcher.offset.cache.period.ms=6, 
controlled.shutdown.retry.backoff.ms=5000, security.providers=null, 
confluent.verify.group.subscription.prefix=false, 
l*istener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.e2.test.security.PlainServerCallbackHandler,*
 log.roll.hours=168, log.cleanup.policy=[delete], confluent.enabl

 

If there are any sample code explaining custom SASL authentication, it could 
help us. I could not find any proper sample code related to this topic.

  was:
I'm trying to add custom handler for SASL PLAN authentication. i have followed 
kafka document which says to add 
"listener.name.sasl_ssl.plain.sasl.server.callback.handler.class" with custom 
class name to server config.  but this custom class is never taken , its always 
going to default PlainServerCallbackHandler. 

On debuging the kafka-client code, observed that 
SaslChannelBuilder->createServerCallbackHandlers method is trying to read 
config property as "plain.sasl.server.callback.handler.class". which is 
different from the one mentioned in the doc.  i have changed the property name 
in my config file and tried, but still it did not work.

If there are any sample code explaining custom SASL authentication, it could 
help us. I could not find any proper sample code related to this topic.


> SASL PLAIN authentication with custom callback handler
> --
>
> Key: KAFKA-9967
> URL: https://issues.apache.org/jira/browse/KAFKA-9967
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: indira
>Priority: Major
>
> I'm trying to add custom handler for SASL PLAN authentication. i have 
> followed kafka document which says to add 
> "listener.name.sasl_ssl.plain.sasl.server.callback.handler.class" with custom 
> class name to server config.  but this custom class is never taken , its 
> always going to default PlainServerCallbackHandler. 
> On debuging the kafka-client code, observed that 
> SaslChannelBuilder->createServerCallbackHandlers method is trying to read 
> config property as "plain.sasl.server.callback.handler.class". which is 
> different from the one mentioned in the doc.  i have changed the property 
> name in my config file and tried, but still it did not work.
> Below is part of the config 
> sasl.server.callback.handler.class=null, 
> confluent.ssl.keystore.location=null, adverti
> sed.port=null, log.cleaner.dedupe.buffer.size=134217728, 
> confluent.bearer.auth.token=null, 
> confluent.tier.s3.aws.endpoint.override=null, 
> log.cleaner.io.buffer.size=524288, create.topic.policy.class.name=null, 
> confluent.missing.id.cache.ttl.sec=60, 
> confluent.tier.fetcher.offset.cache.period.ms=6, 
> controlled.shutdown.retry.backoff.ms=5000, security.providers=null, 
> confluent.verify.group.subscription.prefix=false, 
> l*istener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.e2.test.security.PlainServerCallbackHandler,*
>  log.roll.hours=168, log.cleanup.policy=[delete], confluent.enabl
>  
> If there are any sample code explaining custom SASL authentication, it could 
> help us. I could not find any proper sample code related to this topic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9967) SASL PLAIN authentication with custom callback handler

2020-05-06 Thread indira (Jira)
indira created KAFKA-9967:
-

 Summary: SASL PLAIN authentication with custom callback handler
 Key: KAFKA-9967
 URL: https://issues.apache.org/jira/browse/KAFKA-9967
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.5.0
Reporter: indira


I'm trying to add custom handler for SASL PLAN authentication. i have followed 
kafka document which says to add 
"listener.name.sasl_ssl.plain.sasl.server.callback.handler.class" with custom 
class name to server config.  but this custom class is never taken , its always 
going to default PlainServerCallbackHandler. 

On debuging the kafka-client code, observed that 
SaslChannelBuilder->createServerCallbackHandlers method is trying to read 
config property as "plain.sasl.server.callback.handler.class". which is 
different from the one mentioned in the doc.  i have changed the property name 
in my config file and tried, but still it did not work.

If there are any sample code explaining custom SASL authentication, it could 
help us. I could not find any proper sample code related to this topic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #8627: MINOR - Increase Trogdor Histogram buckets for latency to 10000ms

2020-05-06 Thread GitBox


cmccabe commented on pull request #8627:
URL: https://github.com/apache/kafka/pull/8627#issuecomment-625017914


   test failure is not related



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations

2020-05-06 Thread GitBox


showuon commented on pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#issuecomment-625012168


   Hi @kkonstantine , could you please review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8622: MINOR: Update stream documentation

2020-05-06 Thread GitBox


showuon commented on pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#issuecomment-625011283


   Hi @bbejeck , could you please review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-06 Thread GitBox


feyman2016 commented on pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#issuecomment-625008508


   @abbccdda Hey, updated based on comments, and also left some comments there, 
thanks. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-06 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r421216977



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);
+ex.printStackTrace();
+}
+
+List memberToRemove = new ArrayList<>();
+for (MemberDescription member: members) {

Review comment:
   I reran the self style check, but didn't capture any error. I assume the 
error would be the missed `final` in for loop, updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-06 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r421216617



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);
+ex.printStackTrace();

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-06 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r421216746



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -32,12 +32,23 @@
 public class RemoveMembersFromConsumerGroupOptions extends 
AbstractOptions {
 
 private Set members;

Review comment:
   Updated~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-06 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r420824572



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);

Review comment:
   Fixed~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9660) KAFKA-1 build a kafka-exporter by java

2020-05-06 Thread HaiyuanZhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101334#comment-17101334
 ] 

HaiyuanZhao edited comment on KAFKA-9660 at 5/7/20, 3:04 AM:
-

[~tigerlee] hi, I am interested in this proposal :)


was (Author: zhaohaidao):
I am interested in this proposal :)

>  KAFKA-1 build a kafka-exporter by java
> ---
>
> Key: KAFKA-9660
> URL: https://issues.apache.org/jira/browse/KAFKA-9660
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, metrics
>Affects Versions: 0.10.2.0, 1.1.0, 2.0.0
> Environment: java8+
>Reporter: francis lee
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: newbie
>
> [KIP-575|https://cwiki.apache.org/confluence/display/KAFKA/KIP-575%3A+build+a+Kafka-Exporter+by+Java]
> kafka is an excellent MQ running on JVM,  but no exporters JVMly. for a 
> better future of  Kafka-Ecosystems
> the Apache needs a formal exporter like 
> [https://github.com/apache/kafka-exporter].
> i wrote one for working, and hope to give to Apache. there are a lot of 
> metric in JMX, it can be configed in the exporter-config.
>  
> if you are interested in it , join me!
> if you are interested in it , join me!
> if you are interested in it , join me!
>  
> for some metric list here:
> kafka_AddPartitionsToTxn_50thPercentile
> kafka_AddPartitionsToTxn_95thPercentile
> kafka_AddPartitionsToTxn_999thPercentile
> kafka_AddPartitionsToTxn_99thPercentile
> kafka_AddPartitionsToTxn_Count
> kafka_AddPartitionsToTxn_Max
> kafka_AddPartitionsToTxn_Mean
> kafka_AddPartitionsToTxn_MeanRate
> kafka_AddPartitionsToTxn_Min
> kafka_AddPartitionsToTxn_OneMinuteRate
> kafka_AddPartitionsToTxn_StdDev
> kafka_BrokerTopicMetrics_BytesInPerSec_Count
> kafka_BrokerTopicMetrics_BytesInPerSec_MeanRate
> kafka_BrokerTopicMetrics_BytesInPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_BytesOutPerSec_Count
> kafka_BrokerTopicMetrics_BytesOutPerSec_MeanRate
> kafka_BrokerTopicMetrics_BytesOutPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_BytesRejectedPerSec_Count
> kafka_BrokerTopicMetrics_BytesRejectedPerSec_MeanRate
> kafka_BrokerTopicMetrics_BytesRejectedPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_FailedFetchRequestsPerSec_Count
> kafka_BrokerTopicMetrics_FailedFetchRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_FailedFetchRequestsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_FailedProduceRequestsPerSec_Count
> kafka_BrokerTopicMetrics_FailedProduceRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_FailedProduceRequestsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_MessagesInPerSec_Count
> kafka_BrokerTopicMetrics_MessagesInPerSec_MeanRate
> kafka_BrokerTopicMetrics_MessagesInPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_ProduceMessageConversionsPerSec_Count
> kafka_BrokerTopicMetrics_ProduceMessageConversionsPerSec_MeanRate
> kafka_BrokerTopicMetrics_ProduceMessageConversionsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_ReplicationBytesInPerSec_Count
> kafka_BrokerTopicMetrics_ReplicationBytesInPerSec_MeanRate
> kafka_BrokerTopicMetrics_ReplicationBytesInPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_ReplicationBytesOutPerSec_Count
> kafka_BrokerTopicMetrics_ReplicationBytesOutPerSec_MeanRate
> kafka_BrokerTopicMetrics_ReplicationBytesOutPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_TotalFetchRequestsPerSec_Count
> kafka_BrokerTopicMetrics_TotalFetchRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_TotalFetchRequestsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_TotalProduceRequestsPerSec_Count
> kafka_BrokerTopicMetrics_TotalProduceRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_TotalProduceRequestsPerSec_OneMinuteRate
> kafka_BytesInPerSec_Count
> kafka_BytesInPerSec_FifteenMinuteRate
> kafka_BytesInPerSec_FiveMinuteRate
> kafka_BytesInPerSec_MeanRate
> kafka_BytesInPerSec_OneMinuteRate
> kafka_BytesOutPerSec_Count
> kafka_BytesOutPerSec_FifteenMinuteRate
> kafka_BytesOutPerSec_FiveMinuteRate
> kafka_BytesOutPerSec_MeanRate
> kafka_BytesOutPerSec_OneMinuteRate
> kafka_BytesRejectedPerSec_Count
> kafka_BytesRejectedPerSec_FifteenMinuteRate
> kafka_BytesRejectedPerSec_FiveMinuteRate
> kafka_BytesRejectedPerSec_MeanRate
> kafka_BytesRejectedPerSec_OneMinuteRate
> kafka_CreatePartitions_50thPercentile
> kafka_CreatePartitions_95thPercentile
> kafka_CreatePartitions_999thPercentile
> kafka_CreatePartitions_99thPercentile
> kafka_CreatePartitions_Count
> kafka_CreatePartitions_Max
> kafka_CreatePartitions_Mean
> kafka_CreatePartitions_MeanRate
> kafka_CreatePartitions_Min
> kafka_CreatePartitions_OneMinuteRate
> kafka_CreatePartitions_StdDev
> kafka_CreateTopics_50thPercentile
> kafka_CreateTopics_95thPercentile
> kafka_CreateTopics_999thPercentile
> kafka_CreateTopics_99thPercentile
> kafka_CreateTopics_Count
> 

[jira] [Commented] (KAFKA-9660) KAFKA-1 build a kafka-exporter by java

2020-05-06 Thread HaiyuanZhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101334#comment-17101334
 ] 

HaiyuanZhao commented on KAFKA-9660:


I am interested in this proposal :)

>  KAFKA-1 build a kafka-exporter by java
> ---
>
> Key: KAFKA-9660
> URL: https://issues.apache.org/jira/browse/KAFKA-9660
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, metrics
>Affects Versions: 0.10.2.0, 1.1.0, 2.0.0
> Environment: java8+
>Reporter: francis lee
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: newbie
>
> [KIP-575|https://cwiki.apache.org/confluence/display/KAFKA/KIP-575%3A+build+a+Kafka-Exporter+by+Java]
> kafka is an excellent MQ running on JVM,  but no exporters JVMly. for a 
> better future of  Kafka-Ecosystems
> the Apache needs a formal exporter like 
> [https://github.com/apache/kafka-exporter].
> i wrote one for working, and hope to give to Apache. there are a lot of 
> metric in JMX, it can be configed in the exporter-config.
>  
> if you are interested in it , join me!
> if you are interested in it , join me!
> if you are interested in it , join me!
>  
> for some metric list here:
> kafka_AddPartitionsToTxn_50thPercentile
> kafka_AddPartitionsToTxn_95thPercentile
> kafka_AddPartitionsToTxn_999thPercentile
> kafka_AddPartitionsToTxn_99thPercentile
> kafka_AddPartitionsToTxn_Count
> kafka_AddPartitionsToTxn_Max
> kafka_AddPartitionsToTxn_Mean
> kafka_AddPartitionsToTxn_MeanRate
> kafka_AddPartitionsToTxn_Min
> kafka_AddPartitionsToTxn_OneMinuteRate
> kafka_AddPartitionsToTxn_StdDev
> kafka_BrokerTopicMetrics_BytesInPerSec_Count
> kafka_BrokerTopicMetrics_BytesInPerSec_MeanRate
> kafka_BrokerTopicMetrics_BytesInPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_BytesOutPerSec_Count
> kafka_BrokerTopicMetrics_BytesOutPerSec_MeanRate
> kafka_BrokerTopicMetrics_BytesOutPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_BytesRejectedPerSec_Count
> kafka_BrokerTopicMetrics_BytesRejectedPerSec_MeanRate
> kafka_BrokerTopicMetrics_BytesRejectedPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_FailedFetchRequestsPerSec_Count
> kafka_BrokerTopicMetrics_FailedFetchRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_FailedFetchRequestsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_FailedProduceRequestsPerSec_Count
> kafka_BrokerTopicMetrics_FailedProduceRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_FailedProduceRequestsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_MessagesInPerSec_Count
> kafka_BrokerTopicMetrics_MessagesInPerSec_MeanRate
> kafka_BrokerTopicMetrics_MessagesInPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_ProduceMessageConversionsPerSec_Count
> kafka_BrokerTopicMetrics_ProduceMessageConversionsPerSec_MeanRate
> kafka_BrokerTopicMetrics_ProduceMessageConversionsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_ReplicationBytesInPerSec_Count
> kafka_BrokerTopicMetrics_ReplicationBytesInPerSec_MeanRate
> kafka_BrokerTopicMetrics_ReplicationBytesInPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_ReplicationBytesOutPerSec_Count
> kafka_BrokerTopicMetrics_ReplicationBytesOutPerSec_MeanRate
> kafka_BrokerTopicMetrics_ReplicationBytesOutPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_TotalFetchRequestsPerSec_Count
> kafka_BrokerTopicMetrics_TotalFetchRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_TotalFetchRequestsPerSec_OneMinuteRate
> kafka_BrokerTopicMetrics_TotalProduceRequestsPerSec_Count
> kafka_BrokerTopicMetrics_TotalProduceRequestsPerSec_MeanRate
> kafka_BrokerTopicMetrics_TotalProduceRequestsPerSec_OneMinuteRate
> kafka_BytesInPerSec_Count
> kafka_BytesInPerSec_FifteenMinuteRate
> kafka_BytesInPerSec_FiveMinuteRate
> kafka_BytesInPerSec_MeanRate
> kafka_BytesInPerSec_OneMinuteRate
> kafka_BytesOutPerSec_Count
> kafka_BytesOutPerSec_FifteenMinuteRate
> kafka_BytesOutPerSec_FiveMinuteRate
> kafka_BytesOutPerSec_MeanRate
> kafka_BytesOutPerSec_OneMinuteRate
> kafka_BytesRejectedPerSec_Count
> kafka_BytesRejectedPerSec_FifteenMinuteRate
> kafka_BytesRejectedPerSec_FiveMinuteRate
> kafka_BytesRejectedPerSec_MeanRate
> kafka_BytesRejectedPerSec_OneMinuteRate
> kafka_CreatePartitions_50thPercentile
> kafka_CreatePartitions_95thPercentile
> kafka_CreatePartitions_999thPercentile
> kafka_CreatePartitions_99thPercentile
> kafka_CreatePartitions_Count
> kafka_CreatePartitions_Max
> kafka_CreatePartitions_Mean
> kafka_CreatePartitions_MeanRate
> kafka_CreatePartitions_Min
> kafka_CreatePartitions_OneMinuteRate
> kafka_CreatePartitions_StdDev
> kafka_CreateTopics_50thPercentile
> kafka_CreateTopics_95thPercentile
> kafka_CreateTopics_999thPercentile
> kafka_CreateTopics_99thPercentile
> kafka_CreateTopics_Count
> kafka_CreateTopics_Max
> kafka_CreateTopics_Mean
> kafka_CreateTopics_MeanRate
> kafka_CreateTopics_Min
> 

[jira] [Commented] (KAFKA-9929) Support reverse iterator on WindowStore

2020-05-06 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101306#comment-17101306
 ] 

Guozhang Wang commented on KAFKA-9929:
--

I looked at the RocksIterator iterator, and I think it is feasible. May be we 
can start the implementation in parallel while preparing the KIP, in case we 
found any gotchas during the implementation that may impact our API design.

> Support reverse iterator on WindowStore
> ---
>
> Key: KAFKA-9929
> URL: https://issues.apache.org/jira/browse/KAFKA-9929
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Currently, WindowStore fetch operations return an iterator sorted from 
> earliest to latest result:
> ```
> * For each key, the iterator guarantees ordering of windows, starting from 
> the oldest/earliest
> * available window to the newest/latest window.
> ```
>  
> We have a use-case where traces are stored in a WindowStore 
> and use Kafka Streams to create a materialized view of traces. A query 
> request comes with a time range (e.g. now-1h, now) and want to return the 
> most recent results, i.e. fetch from this period of time, iterate and pattern 
> match latest/most recent traces, and if enough results, then reply without 
> moving further on the iterator.
> Same store is used to search for previous traces. In this case, it search a 
> key for the last day, if found traces, we would also like to iterate from the 
> most recent.
> RocksDb seems to support iterating backward and forward: 
> [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound]
>  
> For reference: This in some way extracts some bits from this previous issue: 
> https://issues.apache.org/jira/browse/KAFKA-4212:
>  
> > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via 
> > segment dropping, but it stores multiple items per key, based on their 
> > timestamp. But this store can be repurposed as a cache by fetching the 
> > items in reverse chronological order and returning the first item found.
>  
> Would like to know if there is any impediment on RocksDb or  WindowStore to 
> support this.
> Adding an argument to reverse in current fetch methods would be great:
> ```
> WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD)
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


kkonstantine commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-624979305


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


kkonstantine commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421189122



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
 @Override
 public void execute() {
 initializeAndStart();
-try {
+// Make sure any uncommitted data has been committed and the task has
+// a chance to clean up its state
+try (UncheckedCloseable supressible = this::closePartitions) {

Review comment:
   Although I dig supressible with 1 p, like "sup? exception what are you 
up to?" 
   I'm afraid I'll have to abide by the usual rules and recommend: 
`suppressible` here  
   (probably the IDE will complain about a typo too). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


kkonstantine commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421188159



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +858,47 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
   That's the testing I had in mind. That's great. 
   No need to change the exception type right now. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9768) rest.advertised.listener configuration is not handled properly by the worker

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9768.
---
Resolution: Fixed

> rest.advertised.listener configuration is not handled properly by the worker
> 
>
> Key: KAFKA-9768
> URL: https://issues.apache.org/jira/browse/KAFKA-9768
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> The {{rest.advertised.listener}} config can currently be set to either "http" 
> or "https", and a listener with that protocol should be used when advertising 
> the URL of the worker to other members of the Connect cluster.
> For example, someone might configure their worker with a {{listeners}} value 
> of 
> {{[https://localhost:42069,http://localhost:4761|https://localhost:42069%2Chttp//localhost:4761]}}
>  and a {{rest.advertised.listener}} value of {{http}}, which should cause the 
> worker to listen on port {{42069}} with TLS and port {{4761}} with plaintext, 
> and advertise the URL {{[http://localhost:4761|http://localhost:4761/]}} to 
> other workers.
> However, the worker instead advertises the URL 
> {{[https://localhost:42069|https://localhost:42069/]}} to other workers. This 
> is because the {{RestServer}} class, which is responsible for determining 
> which URL to advertise to other workers, simply [chooses the first listener 
> whose name begins with the 
> protocol|https://github.com/apache/kafka/blob/0f48446690e42b78a9a6b8c6a9bbab9f01d84cb1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L422]
>  specified in the {{rest.advertised.listener}} config.
> This breaks because "http" is a prefix of "https", so if the advertised 
> listener is "http" but the first listener that's found starts with 
> "https://;, that listener will still be chosen.
> This bug has been present since SSL support (and the 
> {{rest.advertised.listener}} config) were added via 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface],
>  in release 1.1.0.
> This bug should only present in the case where a user has set 
> {{rest.advertised.listener}} to {{http}} but the {{listeners}} list begins 
> with a listener that uses {{https}}. A workaround can be performed by 
> changing the order of the {{listeners}} list to put the desired advertised 
> listener at the beginning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9768) rest.advertised.listener configuration is not handled properly by the worker

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9768:
--
Fix Version/s: 2.5.1
   2.6.0

> rest.advertised.listener configuration is not handled properly by the worker
> 
>
> Key: KAFKA-9768
> URL: https://issues.apache.org/jira/browse/KAFKA-9768
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> The {{rest.advertised.listener}} config can currently be set to either "http" 
> or "https", and a listener with that protocol should be used when advertising 
> the URL of the worker to other members of the Connect cluster.
> For example, someone might configure their worker with a {{listeners}} value 
> of 
> {{[https://localhost:42069,http://localhost:4761|https://localhost:42069%2Chttp//localhost:4761]}}
>  and a {{rest.advertised.listener}} value of {{http}}, which should cause the 
> worker to listen on port {{42069}} with TLS and port {{4761}} with plaintext, 
> and advertise the URL {{[http://localhost:4761|http://localhost:4761/]}} to 
> other workers.
> However, the worker instead advertises the URL 
> {{[https://localhost:42069|https://localhost:42069/]}} to other workers. This 
> is because the {{RestServer}} class, which is responsible for determining 
> which URL to advertise to other workers, simply [chooses the first listener 
> whose name begins with the 
> protocol|https://github.com/apache/kafka/blob/0f48446690e42b78a9a6b8c6a9bbab9f01d84cb1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L422]
>  specified in the {{rest.advertised.listener}} config.
> This breaks because "http" is a prefix of "https", so if the advertised 
> listener is "http" but the first listener that's found starts with 
> "https://;, that listener will still be chosen.
> This bug has been present since SSL support (and the 
> {{rest.advertised.listener}} config) were added via 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface],
>  in release 1.1.0.
> This bug should only present in the case where a user has set 
> {{rest.advertised.listener}} to {{http}} but the {{listeners}} list begins 
> with a listener that uses {{https}}. A workaround can be performed by 
> changing the order of the {{listeners}} list to put the desired advertised 
> listener at the beginning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9768) rest.advertised.listener configuration is not handled properly by the worker

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9768:
--
Fix Version/s: 2.4.2

> rest.advertised.listener configuration is not handled properly by the worker
> 
>
> Key: KAFKA-9768
> URL: https://issues.apache.org/jira/browse/KAFKA-9768
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> The {{rest.advertised.listener}} config can currently be set to either "http" 
> or "https", and a listener with that protocol should be used when advertising 
> the URL of the worker to other members of the Connect cluster.
> For example, someone might configure their worker with a {{listeners}} value 
> of 
> {{[https://localhost:42069,http://localhost:4761|https://localhost:42069%2Chttp//localhost:4761]}}
>  and a {{rest.advertised.listener}} value of {{http}}, which should cause the 
> worker to listen on port {{42069}} with TLS and port {{4761}} with plaintext, 
> and advertise the URL {{[http://localhost:4761|http://localhost:4761/]}} to 
> other workers.
> However, the worker instead advertises the URL 
> {{[https://localhost:42069|https://localhost:42069/]}} to other workers. This 
> is because the {{RestServer}} class, which is responsible for determining 
> which URL to advertise to other workers, simply [chooses the first listener 
> whose name begins with the 
> protocol|https://github.com/apache/kafka/blob/0f48446690e42b78a9a6b8c6a9bbab9f01d84cb1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L422]
>  specified in the {{rest.advertised.listener}} config.
> This breaks because "http" is a prefix of "https", so if the advertised 
> listener is "http" but the first listener that's found starts with 
> "https://;, that listener will still be chosen.
> This bug has been present since SSL support (and the 
> {{rest.advertised.listener}} config) were added via 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface],
>  in release 1.1.0.
> This bug should only present in the case where a user has set 
> {{rest.advertised.listener}} to {{http}} but the {{listeners}} list begins 
> with a listener that uses {{https}}. A workaround can be performed by 
> changing the order of the {{listeners}} list to put the desired advertised 
> listener at the beginning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta

2020-05-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101293#comment-17101293
 ] 

Matthias J. Sax commented on KAFKA-9966:


Might be related to KAFKA-9831.

> Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-9966
> URL: https://issues.apache.org/jira/browse/KAFKA-9966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/]
> {quote}java.lang.AssertionError: Condition not met within timeout 6. 
> Clients did not startup and stabilize on time. Observed transitions: client-1 
> transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] 
> client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, 
> RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta

2020-05-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9966:
--

Assignee: Matthias J. Sax

> Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-9966
> URL: https://issues.apache.org/jira/browse/KAFKA-9966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/]
> {quote}java.lang.AssertionError: Condition not met within timeout 6. 
> Clients did not startup and stabilize on time. Observed transitions: client-1 
> transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] 
> client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, 
> RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta

2020-05-06 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9966:
--

 Summary: Flaky Test 
EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
 Key: KAFKA-9966
 URL: https://issues.apache.org/jira/browse/KAFKA-9966
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/]
{quote}java.lang.AssertionError: Condition not met within timeout 6. 
Clients did not startup and stabilize on time. Observed transitions: client-1 
transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] 
client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, 
RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8360: KAFKA-9768: Fix handling of rest.advertised.listener config

2020-05-06 Thread GitBox


kkonstantine commented on pull request #8360:
URL: https://github.com/apache/kafka/pull/8360#issuecomment-624970463


   Only a couple streams integration test failures in every jdk. 
   These are unrelated to this PR, so I'm merging. 
   Thanks for the fix @C0urante !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9949) Flaky Test GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin

2020-05-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101291#comment-17101291
 ] 

Matthias J. Sax commented on KAFKA-9949:


[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableIntegrationTest/shouldKStreamGlobalKTableLeftJoin/]

> Flaky Test GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin
> 
>
> Key: KAFKA-9949
> URL: https://issues.apache.org/jira/browse/KAFKA-9949
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/248/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableIntegrationTest/shouldKStreamGlobalKTableLeftJoin/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> waiting for final values at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
> org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableIntegrationTest.java:175){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8603: MINOR: Fix ProcessorContext JavaDocs

2020-05-06 Thread GitBox


mjsax commented on pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#issuecomment-624970217


   Java 8: 
`org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableJoin[exactly_once_beta]`
   Java 11: timed out
   Java 14:
   ```
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8621: KAFKA-9466: Update Kafka Streams docs for KIP-447

2020-05-06 Thread GitBox


mjsax commented on pull request #8621:
URL: https://github.com/apache/kafka/pull/8621#issuecomment-624968745


   @abbccdda Updated this PR -- added more details and fixed some links and 
inconsistencies.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8621: KAFKA-9466: Update Kafka Streams docs for KIP-447

2020-05-06 Thread GitBox


mjsax commented on a change in pull request #8621:
URL: https://github.com/apache/kafka/pull/8621#discussion_r421167972



##
File path: docs/streams/core-concepts.html
##
@@ -206,16 +206,16 @@ Pr
 to the stream processing pipeline, known as the http://lambda-architecture.net/;>Lambda Architecture.
 Prior to 0.11.0.0, Kafka only provides at-least-once delivery 
guarantees and hence any stream processing systems that leverage it as the 
backend storage could not guarantee end-to-end exactly-once semantics.
 In fact, even for those stream processing systems that claim to 
support exactly-once processing, as long as they are reading from / writing to 
Kafka as the source / sink, their applications cannot actually guarantee that
-no duplicates will be generated throughout the pipeline.
+no duplicates will be generated throughout the pipeline.
 
 Since the 0.11.0.0 release, Kafka has added support to allow its 
producers to send messages to different topic partitions in a https://kafka.apache.org/documentation/#semantics;>transactional and 
idempotent manner,
 and Kafka Streams has hence added the end-to-end exactly-once 
processing semantics by leveraging these features.
 More specifically, it guarantees that for any record read from the 
source Kafka topics, its processing results will be reflected exactly once in 
the output Kafka topic as well as in the state stores for stateful operations.
 Note the key difference between Kafka Streams end-to-end exactly-once 
guarantee with other stream processing frameworks' claimed guarantees is that 
Kafka Streams tightly integrates with the underlying Kafka storage system and 
ensure that
 commits on the input topic offsets, updates on the state stores, and 
writes to the output topics will be completed atomically instead of treating 
Kafka as an external system that may have side-effects.
-To read more details on how this is done inside Kafka Streams, readers 
are recommended to read https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics;>KIP-129.
+To read more details on how this is done inside Kafka Streams, readers 
are recommended to read https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics;>KIP-129.
 
-In order to achieve exactly-once semantics when running Kafka Streams 
applications, users can simply set the processing.guarantee config 
value to exactly_once (default value is at_least_once).
+In order to achieve exactly-once semantics when running Kafka Streams 
applications, users can simply set the processing.guarantee config 
value (default value is at_least_once) to exactly_once or 
exactly_once_beta (requires brokers version 2.5 or newer).
 More details can be found in the Kafka Streams 
Configs section.

Review comment:
   Oh, I missed to extend this section... IMHO, it good to have the details 
in the config section as many users (especially existing users) won't read the 
"concepts" page.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8621: KAFKA-9466: Update Kafka Streams docs for KIP-447

2020-05-06 Thread GitBox


mjsax commented on a change in pull request #8621:
URL: https://github.com/apache/kafka/pull/8621#discussion_r421166821



##
File path: docs/upgrade.html
##
@@ -19,6 +19,12 @@
 
 

[jira] [Created] (KAFKA-9965) Uneven distribution with RoundRobinPartitioner in AK 2.4+

2020-05-06 Thread Michael Bingham (Jira)
Michael Bingham created KAFKA-9965:
--

 Summary: Uneven distribution with RoundRobinPartitioner in AK 2.4+
 Key: KAFKA-9965
 URL: https://issues.apache.org/jira/browse/KAFKA-9965
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.4.1, 2.5.0, 2.4.0
Reporter: Michael Bingham


{{RoundRobinPartitioner}} states that it will provide equal distribution of 
records across partitions. However with the enhancements made in KIP-480, it 
may not. In some cases, when a new batch is started, the partitioner may be 
called a second time for the same record:

[https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L909]

[https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L934]

Each time the partitioner is called, it increments a counter in 
{{RoundRobinPartitioner}}, so this can result in unequal distribution.

Easiest fix might be to decrement the counter in 
{{RoundRobinPartitioner#onNewBatch}}.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9964) Better description of RoundRobinPartitioner behavior for AK 2.4+

2020-05-06 Thread Michael Bingham (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Bingham resolved KAFKA-9964.

Resolution: Invalid

> Better description of RoundRobinPartitioner behavior for AK 2.4+
> 
>
> Key: KAFKA-9964
> URL: https://issues.apache.org/jira/browse/KAFKA-9964
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Michael Bingham
>Priority: Minor
>
> The Javadocs for {{RoundRobinPartitioner}} currently state:
> {quote}This partitioning strategy can be used when user wants to distribute 
> the writes to all partitions equally
> {quote}
> In AK 2.4+, equal distribution is not guaranteed, even with this partitioner. 
> The enhancements to consider batching made with 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner]
>  affect this partitioner as well.
> So it would be useful to add some additional Javadocs to explain that unless 
> batching is disabled, even distribution of records is not guaranteed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9964) Better description of RoundRobinPartitioner behavior for AK 2.4+

2020-05-06 Thread Michael Bingham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101266#comment-17101266
 ] 

Michael Bingham commented on KAFKA-9964:


Closing this out. Turns out the issue is a bug in RoundRobinPartitioner, and 
will file a new ticket for that.

> Better description of RoundRobinPartitioner behavior for AK 2.4+
> 
>
> Key: KAFKA-9964
> URL: https://issues.apache.org/jira/browse/KAFKA-9964
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Michael Bingham
>Priority: Minor
>
> The Javadocs for {{RoundRobinPartitioner}} currently state:
> {quote}This partitioning strategy can be used when user wants to distribute 
> the writes to all partitions equally
> {quote}
> In AK 2.4+, equal distribution is not guaranteed, even with this partitioner. 
> The enhancements to consider batching made with 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner]
>  affect this partitioner as well.
> So it would be useful to add some additional Javadocs to explain that unless 
> batching is disabled, even distribution of records is not guaranteed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeffhuang26 commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

2020-05-06 Thread GitBox


jeffhuang26 commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r421147218



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##
@@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException {
 Assert.assertEquals(404, response.getStatusLine().getStatusCode());
 }
 
+@Test
+public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+String headerConfig =
+"add X-XSS-Protection: 1; mode=block, \"add Cache-Control: 
no-cache, no-store, must-revalidate\"";
+Map expectedHeaders = new HashMap<>();
+expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+expectedHeaders.put("Cache-Control", "no-cache, no-store, 
must-revalidate");
+checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+}
+
+@Test
+public void testDefaultCustomizedHttpResponseHeaders() throws IOException  
{
+String headerConfig = "";
+Map expectedHeaders = new HashMap<>();
+checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+}
+
+@Test(expected = ConfigException.class)
+public void testInvalidHeaderConfigFormat() {
+String headerConfig = "set add X-XSS-Protection: 1";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testMissedAction() {
+String headerConfig = "X-Frame-Options: DENY";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testMissedHeaderName() {
+String headerConfig = "add :DENY";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testMissedHeaderValue() {
+String headerConfig = "add X-Frame-Options";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testInvalidHeaderConfigAction() {
+String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+public void checkCustomizedHttpResponseHeaders(String headerConfig, 
Map expectedHeaders)
+throws IOException  {
+Map workerProps = baseWorkerProps();
+workerProps.put("offset.storage.file.filename", "/tmp");
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+workerConfig,
+
ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", 
"b"));
+
+PowerMock.replayAll();
+
+server = new RestServer(workerConfig);
+server.initializeServer();
+server.initializeResources(herder);
+HttpRequest request = new HttpGet("/connectors");
+CloseableHttpClient httpClient = HttpClients.createMinimal();
+HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
+CloseableHttpResponse response = httpClient.execute(httpHost, request);
+Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+if (!headerConfig.isEmpty()) {
+expectedHeaders.forEach((k, v) ->
+Assert.assertEquals(response.getFirstHeader(k).getValue(), 
v));
+} else {
+Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+}
+response.close();
+server.stop();
+}
+

Review comment:
   It is good idea  to assert each checking using assertValidHeaderConfig 
and assertInvalidHeaderConfig. I think latest version will cover validation of 
header config. Please let me know. For all valid header config cases, 

[jira] [Updated] (KAFKA-9419) Integer Overflow Possible with CircularIterator

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9419:
--
Fix Version/s: 2.5.1
   2.4.2
   2.6.0
   2.3.2

> Integer Overflow Possible with CircularIterator
> ---
>
> Key: KAFKA-9419
> URL: https://issues.apache.org/jira/browse/KAFKA-9419
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mollitor
>Priority: Minor
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Very unlikely to happen, but as someone that gets called in when something 
> goes wrong, I'd like to remove as many possibilities as possible.
>  
> [https://github.com/apache/kafka/blob/8c21fa837df6908d9147805e097407d006d95fd4/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java#L39-L43]
>  
> Also, the current implementation will work with a LinkedList, but it won't 
> work well.  The constant call to `get(i)` will perform at O(n^2).  Using an 
> iterator instead allows other Collections to work reasonably well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9419) Integer Overflow Possible with CircularIterator

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9419.
---
Resolution: Fixed

> Integer Overflow Possible with CircularIterator
> ---
>
> Key: KAFKA-9419
> URL: https://issues.apache.org/jira/browse/KAFKA-9419
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mollitor
>Priority: Minor
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Very unlikely to happen, but as someone that gets called in when something 
> goes wrong, I'd like to remove as many possibilities as possible.
>  
> [https://github.com/apache/kafka/blob/8c21fa837df6908d9147805e097407d006d95fd4/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java#L39-L43]
>  
> Also, the current implementation will work with a LinkedList, but it won't 
> work well.  The constant call to `get(i)` will perform at O(n^2).  Using an 
> iterator instead allows other Collections to work reasonably well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeffhuang26 commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

2020-05-06 Thread GitBox


jeffhuang26 commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r421147218



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##
@@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException {
 Assert.assertEquals(404, response.getStatusLine().getStatusCode());
 }
 
+@Test
+public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+String headerConfig =
+"add X-XSS-Protection: 1; mode=block, \"add Cache-Control: 
no-cache, no-store, must-revalidate\"";
+Map expectedHeaders = new HashMap<>();
+expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+expectedHeaders.put("Cache-Control", "no-cache, no-store, 
must-revalidate");
+checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+}
+
+@Test
+public void testDefaultCustomizedHttpResponseHeaders() throws IOException  
{
+String headerConfig = "";
+Map expectedHeaders = new HashMap<>();
+checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+}
+
+@Test(expected = ConfigException.class)
+public void testInvalidHeaderConfigFormat() {
+String headerConfig = "set add X-XSS-Protection: 1";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testMissedAction() {
+String headerConfig = "X-Frame-Options: DENY";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testMissedHeaderName() {
+String headerConfig = "add :DENY";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testMissedHeaderValue() {
+String headerConfig = "add X-Frame-Options";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+@Test(expected = ConfigException.class)
+public void testInvalidHeaderConfigAction() {
+String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+Map workerProps = baseWorkerProps();
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+}
+
+public void checkCustomizedHttpResponseHeaders(String headerConfig, 
Map expectedHeaders)
+throws IOException  {
+Map workerProps = baseWorkerProps();
+workerProps.put("offset.storage.file.filename", "/tmp");
+workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+workerConfig,
+
ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", 
"b"));
+
+PowerMock.replayAll();
+
+server = new RestServer(workerConfig);
+server.initializeServer();
+server.initializeResources(herder);
+HttpRequest request = new HttpGet("/connectors");
+CloseableHttpClient httpClient = HttpClients.createMinimal();
+HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
+CloseableHttpResponse response = httpClient.execute(httpHost, request);
+Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+if (!headerConfig.isEmpty()) {
+expectedHeaders.forEach((k, v) ->
+Assert.assertEquals(response.getFirstHeader(k).getValue(), 
v));
+} else {
+Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+}
+response.close();
+server.stop();
+}
+

Review comment:
   It is good idea  to assert each checking using assertValidHeaderConfig 
and assertInvalidHeaderConfig. I think latest version will cover validation of 
header config. Please let me know. 





[GitHub] [kafka] kkonstantine commented on pull request #7950: KAFKA-9419: Integer Overflow Possible with CircularIterator

2020-05-06 Thread GitBox


kkonstantine commented on pull request #7950:
URL: https://github.com/apache/kafka/pull/7950#issuecomment-624933756


   jdk 8: stellar
   jdk 11: 2 streams eos test failures: unrelated
   jkd 14: 1 more unrelated failure 
(`kafka.api.TransactionsTest.testBumpTransactionalEpoch`)
   
   Given this outcome, I'm merging. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9964) Better description of RoundRobinPartitioner behavior for AK 2.4+

2020-05-06 Thread Michael Bingham (Jira)
Michael Bingham created KAFKA-9964:
--

 Summary: Better description of RoundRobinPartitioner behavior for 
AK 2.4+
 Key: KAFKA-9964
 URL: https://issues.apache.org/jira/browse/KAFKA-9964
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 2.4.1, 2.5.0, 2.4.0
Reporter: Michael Bingham


The Javadocs for {{RoundRobinPartitioner}} currently state:
{quote}This partitioning strategy can be used when user wants to distribute the 
writes to all partitions equally
{quote}
In AK 2.4+, equal distribution is not guaranteed, even with this partitioner. 
The enhancements to consider batching made with 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner]
 affect this partitioner as well.


So it would be useful to add some additional Javadocs to explain that unless 
batching is disabled, even distribution of records is not guaranteed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9947) TransactionsBounceTest may leave threads running

2020-05-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9947.

Resolution: Fixed

> TransactionsBounceTest may leave threads running
> 
>
> Key: KAFKA-9947
> URL: https://issues.apache.org/jira/browse/KAFKA-9947
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> I saw this failure recently:
> ```
> 14:28:23 kafka.api.TransactionsBounceTest > testWithGroupId FAILED
> 14:28:23 org.scalatest.exceptions.TestFailedException: Consumed 0 records 
> before timeout instead of the expected 200 records
> 14:28:23 at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
> 14:28:23 at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
> 14:28:23 at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
> 14:28:23 at org.scalatest.Assertions.fail(Assertions.scala:1091)
> 14:28:23 at org.scalatest.Assertions.fail$(Assertions.scala:1087)
> 14:28:23 at org.scalatest.Assertions$.fail(Assertions.scala:1389)
> 14:28:23 at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:843)
> 14:28:23 at 
> kafka.api.TransactionsBounceTest.testWithGroupId(TransactionsBounceTest.scala:110)
> ```
> This was followed by a bunch of test failures such as the following:
> ```
> 14:28:38 kafka.api.TransactionsBounceTest > classMethod FAILED
> 14:28:38 java.lang.AssertionError: Found unexpected threads during 
> @AfterClass, allThreads=HashSet(controller-event-thread, 
> ExpirationReaper-0-topic, ExpirationReaper-0-ElectLeader, 
> ExpirationReaper-0-Heartbeat, metrics-meter-tick-thread-2, main, 
> metrics-meter-tick-thread-1, 
> data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-41287, 
> scala-execution-context-global-246, transaction-log-manager-0, Reference 
> Handler, scala-execution-context-global-24107, /127.0.0.1:35460 to 
> /127.0.0.1:42451 workers Thread 2, /127.0.0.1:35460 to /127.0.0.1:42451 
> workers Thread 3, kafka-log-cleaner-thread-0, ExpirationReaper-0-Fetch, 
> scala-execution-context-global-12253, ExpirationReaper-0-Rebalance, 
> Common-Cleaner, daemon-broker-bouncer-EventThread, Signal Dispatcher, 
> SensorExpiryThread, daemon-broker-bouncer-SendThread(127.0.0.1:32919), 
> kafka-scheduler-0, kafka-scheduler-3, kafka-scheduler-4, kafka-scheduler-1, 
> kafka-scheduler-2, kafka-scheduler-7, ExpirationReaper-0-DeleteRecords, 
> kafka-scheduler-8, kafka-scheduler-5, kafka-scheduler-6, 
> scala-execution-context-global-4200, kafka-scheduler-9, LogDirFailureHandler, 
> TxnMarkerSenderThread-0, /config/changes-event-process-thread, 
> ExpirationReaper-0-AlterAcls, group-metadata-manager-0, Test worker, 
> Finalizer, scala-execution-context-global-4199, 
> ThrottledChannelReaper-Produce, data-plane-kafka-request-handler-3, 
> data-plane-kafka-request-handler-2, Controller-0-to-broker-0-send-thread, 
> data-plane-kafka-request-handler-1, 
> data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-1, 
> data-plane-kafka-request-handler-0, 
> data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0, 
> data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-2, 
> scala-execution-context-global-572, scala-execution-context-global-573, 
> data-plane-kafka-request-handler-7, data-plane-kafka-request-handler-6, 
> data-plane-kafka-request-handler-5, scala-execution-context-global-137, 
> data-plane-kafka-request-handler-4, ThrottledChannelReaper-Request, 
> ExpirationReaper-0-Produce, ThrottledChannelReaper-Fetch), 
> unexpected=HashSet(controller-event-thread, daemon-broker-bouncer-EventThread)
> ```
> The test case needs to ensure that `BounceScheduler` gets shutdown properly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9963) High CPU during replication of replaced (empty) broker

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue. Normal traffic on this host is is around 40% 
CPU, when it's completely replicated. I'm not sure if this is 'normal', and we 
just need to throw more resources at the hosts. Or if replication should not 
really use this much CPU.

replica.fetchers=2 (not high). And java version is: OpenJDK Runtime Environment 
Corretto-8.252.09.1 (build 1.8.0_252-b09) (AWS's own build).

CPU profiling (2 minutes ) during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run

   
{code}
{code:java}
  --- 3632000 ns (12.89%), 3632 samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] 

[jira] [Updated] (KAFKA-9963) High CPU during replication of replaced (empty) broker

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue. Normal traffic on this host is is around 40% 
CPU, when it's completely replicated. I'm not sure if this is 'normal', and we 
just need to throw more resources at the hosts. Or if replication should not 
really use this much CPU.

replica.fetchers=2 (not high). And java version is: OpenJDK Runtime Environment 
Corretto-8.252.09.1 (build 1.8.0_252-b09) (AWS's own build).

CPU profiling (2 minutes ) during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run

   

  --- 3632000 ns (12.89%), 3632 samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] 

[jira] [Updated] (KAFKA-9963) High CPU during replication of replaced (empty) broker

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Summary: High CPU during replication of replaced (empty) broker  (was: High 
CPU)

> High CPU during replication of replaced (empty) broker
> --
>
> Key: KAFKA-9963
> URL: https://issues.apache.org/jira/browse/KAFKA-9963
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Evan Williams
>Priority: Major
>
> When replacing a broker, with an empty data dir, and the same broker ID - we 
> are seeing very high CPU usage during replication, generally up to 100% for 
> some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
> 1000 topics and 3000 partitions.
>  
> There is of course traffic being served as well, as it catches up and becomes 
> leader of partitions, however due to the high replication CPU usage - 
> client's start to have connection issue. Normal traffic on this host is is 
> around 40% CPU, when it's completely replicated. I'm not sure if this is 
> 'normal', and we just need to throw more resources at the hosts. Or if 
> replication should not really use this much CPU.
> replica.fetchers=2 (not high). And java version is: OpenJDK Runtime 
> Environment Corretto-8.252.09.1 (build 1.8.0_252-b09) (AWS's own build).
> CPU profiling (2 minutes ) during this 'replace' scenario, shows this:
>  
> {code:java}
>  5473000   19.43% 5473  
> java.util.TreeMap$PrivateEntryIterator.nextEntry
>  4975000   17.66% 4975  
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
>  4417000   15.68% 4417  java.util.TreeMap.successor
>  17730006.29% 1773  java.util.TreeMap$ValueIterator.next
>  1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
>   6010002.13%  601  
> scala.collection.convert.Wrappers$JIteratorWrapper.next
>   5160001.83%  516  writev
> --- 3885000 ns (13.79%), 3885 samples
>   [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
>   [ 1] java.util.TreeMap$ValueIterator.next
>   [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
>   [ 3] scala.collection.Iterator.find
>   [ 4] scala.collection.Iterator.find$
>   [ 5] scala.collection.AbstractIterator.find
>   [ 6] scala.collection.IterableLike.find
>   [ 7] scala.collection.IterableLike.find$
>   [ 8] scala.collection.AbstractIterable.find
>   [ 9] kafka.log.ProducerStateManager.lastStableOffset
>   [10] kafka.log.Log.$anonfun$append$12
>   [11] kafka.log.Log.$anonfun$append$2
>   [12] kafka.log.Log.append
>   [13] kafka.log.Log.appendAsFollower
>   [14] 
> kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
>   [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
>   [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
>   [17] kafka.server.ReplicaFetcherThread.processPartitionData
>   [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
>   [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
>   [20] 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
>   [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
>   [22] scala.collection.mutable.ResizableArray.foreach
>   [23] scala.collection.mutable.ResizableArray.foreach$
>   [24] scala.collection.mutable.ArrayBuffer.foreach
>   [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
>   [26] kafka.server.AbstractFetcherThread.processFetchRequest
>   [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
>   [28] kafka.server.AbstractFetcherThread.maybeFetch
>   [29] kafka.server.AbstractFetcherThread.doWork
>   [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
> samples
>   [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
>   [ 1] scala.collection.Iterator.find
>   [ 2] scala.collection.Iterator.find$
>   [ 3] scala.collection.AbstractIterator.find
>   [ 4] scala.collection.IterableLike.find
>   [ 5] scala.collection.IterableLike.find$
>   [ 6] scala.collection.AbstractIterable.find
>   [ 7] kafka.log.ProducerStateManager.lastStableOffset
>   [ 8] kafka.log.Log.$anonfun$append$12
>   [ 9] kafka.log.Log.$anonfun$append$2
>   [10] kafka.log.Log.append
>   [11] kafka.log.Log.appendAsFollower
>   [12] 
> kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
>   [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
>   [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
>   [15] kafka.server.ReplicaFetcherThread.processPartitionData
>   [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
>   [17] 

[jira] [Updated] (KAFKA-9963) High CPU

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue. Normal traffic on this host is is around 40% 
CPU, when it's completely replicated. I'm not sure if this is 'normal', and we 
just need to throw more resources at the hosts. Or if replication should not 
really use this much CPU.

replica.fetchers=2 (not high). And java version is: OpenJDK Runtime Environment 
Corretto-8.252.09.1 (build 1.8.0_252-b09) (AWS's own build).

CPU profiling (2 minutes ) during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] 

[jira] [Updated] (KAFKA-9963) High CPU

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue. Normal traffic on this host is is around 40% 
CPU, when it's completely replicated. I'm not sure if this is 'normal', and we 
just need to throw more resources at the hosts. Or if replication should not 
really use this much CPU.

CPU profiling (2 minutes ) during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] kafka.server.AbstractFetcherThread.doWork
  [28] kafka.utils.ShutdownableThread.run--- 3236000 ns (11.49%), 3236 
samples
  [ 0] java.util.TreeMap.successor
  [ 1] 

[jira] [Updated] (KAFKA-9963) High CPU

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue. Normal traffic on this host is is around 40% 
CPU, when it's completely replicated.

CPU profiling during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] kafka.server.AbstractFetcherThread.doWork
  [28] kafka.utils.ShutdownableThread.run--- 3236000 ns (11.49%), 3236 
samples
  [ 0] java.util.TreeMap.successor
  [ 1] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 2] java.util.TreeMap$ValueIterator.next
  [ 3] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 4] 

[jira] [Updated] (KAFKA-9963) High CPU

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue. Normal traffic on this host is is around 40% 
CPU, when it's completely replicated.

CPU profiling (2 minutes ) during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] kafka.server.AbstractFetcherThread.doWork
  [28] kafka.utils.ShutdownableThread.run--- 3236000 ns (11.49%), 3236 
samples
  [ 0] java.util.TreeMap.successor
  [ 1] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 2] java.util.TreeMap$ValueIterator.next
  [ 3] 

[jira] [Updated] (KAFKA-9963) High CPU

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue. Normal traffic on this host is produces around 
40% CPU.

CPU profiling during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] kafka.server.AbstractFetcherThread.doWork
  [28] kafka.utils.ShutdownableThread.run--- 3236000 ns (11.49%), 3236 
samples
  [ 0] java.util.TreeMap.successor
  [ 1] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 2] java.util.TreeMap$ValueIterator.next
  [ 3] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 4] 

[jira] [Updated] (KAFKA-9963) High CPU

2020-05-06 Thread Evan Williams (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Williams updated KAFKA-9963:
-
Description: 
When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue.

CPU profiling during this 'replace' scenario, shows this:

 
{code:java}
 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev



--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] kafka.server.AbstractFetcherThread.doWork
  [28] kafka.utils.ShutdownableThread.run--- 3236000 ns (11.49%), 3236 
samples
  [ 0] java.util.TreeMap.successor
  [ 1] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 2] java.util.TreeMap$ValueIterator.next
  [ 3] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 4] scala.collection.Iterator.find
  [ 5] scala.collection.Iterator.find$
  [ 6] 

[jira] [Created] (KAFKA-9963) High CPU

2020-05-06 Thread Evan Williams (Jira)
Evan Williams created KAFKA-9963:


 Summary: High CPU
 Key: KAFKA-9963
 URL: https://issues.apache.org/jira/browse/KAFKA-9963
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.1
Reporter: Evan Williams


When replacing a broker, with an empty data dir, and the same broker ID - we 
are seeing very high CPU usage during replication, generally up to 100% for 
some time, on a 4 VCPU (EC2 R5) host.  This is a 6 host cluster, with approx 
1000 topics and 3000 partitions.

 

There is of course traffic being served as well, as it catches up and becomes 
leader of partitions, however due to the high replication CPU usage - client's 
start to have connection issue.

CPU profiling during this 'replace' scenario, shows this:

 
{code:java}

 5473000   19.43% 5473  java.util.TreeMap$PrivateEntryIterator.nextEntry
 4975000   17.66% 4975  
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
 4417000   15.68% 4417  java.util.TreeMap.successor
 17730006.29% 1773  java.util.TreeMap$ValueIterator.next
 1706.03% 1700  java.util.TreeMap$PrivateEntryIterator.hasNext
  6010002.13%  601  
scala.collection.convert.Wrappers$JIteratorWrapper.next
  5160001.83%  516  writev
--- 3885000 ns (13.79%), 3885 samples
  [ 0] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 1] java.util.TreeMap$ValueIterator.next
  [ 2] scala.collection.convert.Wrappers$JIteratorWrapper.next
  [ 3] scala.collection.Iterator.find
  [ 4] scala.collection.Iterator.find$
  [ 5] scala.collection.AbstractIterator.find
  [ 6] scala.collection.IterableLike.find
  [ 7] scala.collection.IterableLike.find$
  [ 8] scala.collection.AbstractIterable.find
  [ 9] kafka.log.ProducerStateManager.lastStableOffset
  [10] kafka.log.Log.$anonfun$append$12
  [11] kafka.log.Log.$anonfun$append$2
  [12] kafka.log.Log.append
  [13] kafka.log.Log.appendAsFollower
  [14] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [15] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [16] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [17] kafka.server.ReplicaFetcherThread.processPartitionData
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [19] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [20] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [21] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [22] scala.collection.mutable.ResizableArray.foreach
  [23] scala.collection.mutable.ResizableArray.foreach$
  [24] scala.collection.mutable.ArrayBuffer.foreach
  [25] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [26] kafka.server.AbstractFetcherThread.processFetchRequest
  [27] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [28] kafka.server.AbstractFetcherThread.maybeFetch
  [29] kafka.server.AbstractFetcherThread.doWork
  [30] kafka.utils.ShutdownableThread.run--- 3632000 ns (12.89%), 3632 
samples
  [ 0] scala.collection.convert.Wrappers$JIteratorWrapper.hasNext
  [ 1] scala.collection.Iterator.find
  [ 2] scala.collection.Iterator.find$
  [ 3] scala.collection.AbstractIterator.find
  [ 4] scala.collection.IterableLike.find
  [ 5] scala.collection.IterableLike.find$
  [ 6] scala.collection.AbstractIterable.find
  [ 7] kafka.log.ProducerStateManager.lastStableOffset
  [ 8] kafka.log.Log.$anonfun$append$12
  [ 9] kafka.log.Log.$anonfun$append$2
  [10] kafka.log.Log.append
  [11] kafka.log.Log.appendAsFollower
  [12] 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1
  [13] kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica
  [14] kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica
  [15] kafka.server.ReplicaFetcherThread.processPartitionData
  [16] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7
  [17] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6
  [18] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted
  [19] kafka.server.AbstractFetcherThread$$Lambda$552.191789933.apply
  [20] scala.collection.mutable.ResizableArray.foreach
  [21] scala.collection.mutable.ResizableArray.foreach$
  [22] scala.collection.mutable.ArrayBuffer.foreach
  [23] kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5
  [24] kafka.server.AbstractFetcherThread.processFetchRequest
  [25] kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3
  [26] kafka.server.AbstractFetcherThread.maybeFetch
  [27] kafka.server.AbstractFetcherThread.doWork
  [28] kafka.utils.ShutdownableThread.run--- 3236000 ns (11.49%), 3236 
samples
  [ 0] java.util.TreeMap.successor
  [ 1] java.util.TreeMap$PrivateEntryIterator.nextEntry
  [ 2] java.util.TreeMap$ValueIterator.next
  [ 3] 

[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-06 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101215#comment-17101215
 ] 

Georgi Petkov commented on KAFKA-9921:
--

Maybe we can add the same information on the WindowStore#put method as well.

It's a personal style preference but if there are short circuit checks in which 
cases you have trivial or no implementation I would put it at the beginning of 
the method instead of adding more branching in the rest of the logic. So I 
would write:
{code:java}
if (some corner case) {
doSomething();
return;
}{code}
instead of:
{code:java}
if (some corner case) {
doSomething();
} else {
// nested code
if (...) {
...
} else {
...
}
}{code}
It's kind of hard to explain. See 
[this|https://softwareengineering.stackexchange.com/questions/18454/should-i-return-from-a-function-early-or-use-an-if-statement]
 question and InMemoryWindowStore#put.

I haven't checked but I would guess that the behavior with null values and 
retainDuplicates has no explicit tests. If that it the case you could add some.

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bdbyrne opened a new pull request #8628: KAFKA-9942: Fixes ConfigCommand client quotas w/ default users.

2020-05-06 Thread GitBox


bdbyrne opened a new pull request #8628:
URL: https://github.com/apache/kafka/pull/8628


   Fixes ConfigCommand client quotas w/ default users when using 
--bootstrap-servers.
   
   Test is expanded to handle all valid (user, client-id) enumerations.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8379: KAFKA-9780: Deprecate commit records without record metadata

2020-05-06 Thread GitBox


rhauch commented on pull request #8379:
URL: https://github.com/apache/kafka/pull/8379#issuecomment-624894538


   Still waiting for final approval of 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-586%3A+Deprecate+commit+records+without+record+metadata



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #2604: KAFKA-4794: Add access to OffsetStorageReader from SourceConnector

2020-05-06 Thread GitBox


rhauch commented on pull request #2604:
URL: https://github.com/apache/kafka/pull/2604#issuecomment-624891952


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-05-06 Thread GitBox


C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r421091305



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -536,20 +561,37 @@ private void processConnectorConfigUpdates(Set 
connectorConfigUpdates) {
 // If we only have connector config updates, we can just bounce the 
updated connectors that are
 // currently assigned to this worker.
 Set localConnectors = assignment == null ? 
Collections.emptySet() : new HashSet<>(assignment.connectors());
+log.trace(
+"Processing connector config updates; "
++ "currently-owned connectors are {}, and to-be-updated 
connectors are {}",
+localConnectors,
+connectorConfigUpdates
+);
 for (String connectorName : connectorConfigUpdates) {
-if (!localConnectors.contains(connectorName))
+if (!localConnectors.contains(connectorName)) {
+log.trace(
+"Skipping config update for connector {} as it is not 
owned by this worker",
+connectorName
+);
 continue;
+}
 boolean remains = configState.contains(connectorName);
 log.info("Handling connector-only config update by {} connector 
{}",
 remains ? "restarting" : "stopping", connectorName);
-worker.stopConnector(connectorName);
+worker.stopAndAwaitConnector(connectorName);
 // The update may be a deletion, so verify we actually need to 
restart the connector
 if (remains)
-startConnector(connectorName);
+startConnector(connectorName, (error, result) -> { });

Review comment:
   Agh, you're right. Thought I'd committed that but apparently not. Will 
add.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-05-06 Thread GitBox


C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r421091035



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##
@@ -239,9 +236,10 @@ private synchronized void putConnectorConfig(String 
connName,
 }
 
 requestExecutorService.submit(() -> {
-synchronized (this) {
-updateConnectorTasks(connName);
-}
+updateConnectorTasks(connName);
+// synchronized (this) {
+// updateConnectorTasks(connName);
+// }

Review comment:
   臘‍♀️ thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-05-06 Thread GitBox


gharris1727 commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r421079889



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##
@@ -239,9 +236,10 @@ private synchronized void putConnectorConfig(String 
connName,
 }
 
 requestExecutorService.submit(() -> {
-synchronized (this) {
-updateConnectorTasks(connName);
-}
+updateConnectorTasks(connName);
+// synchronized (this) {
+// updateConnectorTasks(connName);
+// }

Review comment:
   nit: leftover comments

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -536,20 +561,37 @@ private void processConnectorConfigUpdates(Set 
connectorConfigUpdates) {
 // If we only have connector config updates, we can just bounce the 
updated connectors that are
 // currently assigned to this worker.
 Set localConnectors = assignment == null ? 
Collections.emptySet() : new HashSet<>(assignment.connectors());
+log.trace(
+"Processing connector config updates; "
++ "currently-owned connectors are {}, and to-be-updated 
connectors are {}",
+localConnectors,
+connectorConfigUpdates
+);
 for (String connectorName : connectorConfigUpdates) {
-if (!localConnectors.contains(connectorName))
+if (!localConnectors.contains(connectorName)) {
+log.trace(
+"Skipping config update for connector {} as it is not 
owned by this worker",
+connectorName
+);
 continue;
+}
 boolean remains = configState.contains(connectorName);
 log.info("Handling connector-only config update by {} connector 
{}",
 remains ? "restarting" : "stopping", connectorName);
-worker.stopConnector(connectorName);
+worker.stopAndAwaitConnector(connectorName);
 // The update may be a deletion, so verify we actually need to 
restart the connector
 if (remains)
-startConnector(connectorName);
+startConnector(connectorName, (error, result) -> { });

Review comment:
   The caller of this function doesn't need to block on starting the 
connector, but I think it should log errors inside of the callback. Otherwise 
they're going to get swallowed.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1182,31 +1245,49 @@ public Void call() throws Exception {
 
 // Helper for starting a connector with the given name, which will extract 
& parse the config, generate connector
 // context and add to the worker. This needs to be called from within the 
main worker thread for this herder.
-private boolean startConnector(String connectorName) {
+// The callback is invoked after the connector has finished startup and 
generated task configs, or failed in the process.
+private void startConnector(String connectorName, Callback callback) 
{
 log.info("Starting connector {}", connectorName);
 final Map configProps = 
configState.connectorConfig(connectorName);
-final ConnectorContext ctx = new HerderConnectorContext(this, 
connectorName);
+final CloseableConnectorContext ctx = new HerderConnectorContext(this, 
connectorName);
 final TargetState initialState = 
configState.targetState(connectorName);
-boolean started = worker.startConnector(connectorName, configProps, 
ctx, this, initialState);
-
-// Immediately request configuration since this could be a brand new 
connector. However, also only update those
-// task configs if they are actually different from the existing ones 
to avoid unnecessary updates when this is
-// just restoring an existing connector.
-if (started && initialState == TargetState.STARTED)
-reconfigureConnectorTasksWithRetry(time.milliseconds(), 
connectorName);
+final Callback onInitialStateChange = (error, newState) 
-> {
+if (error != null) {
+callback.onCompletion(new ConnectException("Failed to start 
connector: " + connectorName), null);
+return;
+}
 
-return started;
+// Use newState here in case the connector has been paused right 
after being created
+if (newState == TargetState.STARTED) {
+addRequest(
+new Callable() {
+@Override
+public Void call() {
+// Request configuration 

[jira] [Updated] (KAFKA-9780) Deprecate commit records without record metadata

2020-05-06 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9780:
-
Fix Version/s: 2.6.0

> Deprecate commit records without record metadata
> 
>
> Key: KAFKA-9780
> URL: https://issues.apache.org/jira/browse/KAFKA-9780
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.4.1
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Minor
> Fix For: 2.6.0
>
>
> Since KIP-382 (MirrorMaker 2.0) a new method {{commitRecord}} was included in 
> {{SourceTask}} class to be called by the worker adding a new parameter with 
> the record metadata. The old {{commitRecord}} method is called and from the 
> new one and it's preserved just for backwards compatibility.
> The idea is to deprecate this method so that we could remove it in a future 
> release.
> There is a KIP for this ticket: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-586%3A+Deprecate+commit+records+without+record+metadata]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-06 Thread GitBox


d8tltanc edited a comment on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624421790


   I think `ClientQuotaEntity` is only used in client quota alternation. But 
the description doesn't use that.
   Update: I've pushed my latest local changes if that helps. My patch is 
assuming the server is treating `match` value null as the default. The original 
design might be assuming `match` value Optional.empty() as the default and 
there might be some inconsistencies. The failed tests are 
**testDescribeClientQuotasMatchExact** and 
**testDescribeClientQuotasMatchPartial**



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8627: MINOR - Increase Trogdor Histogram buckets for latency to 10000ms

2020-05-06 Thread GitBox


cmccabe commented on pull request #8627:
URL: https://github.com/apache/kafka/pull/8627#issuecomment-624872970


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scott-hendricks opened a new pull request #8627: MINOR - Increase Trogdor Histogram buckets for latency to 10000ms

2020-05-06 Thread GitBox


scott-hendricks opened a new pull request #8627:
URL: https://github.com/apache/kafka/pull/8627


   The current latency histograms for ProduceBenchWorker and ConsumeBenchWorker 
are limited to 5000ms, causing the histograms to truncate and report 5000ms on 
requests that take longer.  This increases the maximum latency the histogram 
accepts to 1ms.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-06 Thread GitBox


d8tltanc edited a comment on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624421790


   I think `ClientQuotaEntity` is only used in client quota alternation. But 
the description doesn't use that.
   Update: I've pushed the my latest local changes if that helps



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8615: KAFKA-9954: Config command didn't validate the unsupported user config change

2020-05-06 Thread GitBox


d8tltanc edited a comment on pull request #8615:
URL: https://github.com/apache/kafka/pull/8615#issuecomment-624857586


   Thanks for the review. Since we are adding a unit test, move the argument 
checking in checkArgs()
   Edit: Since the existing tests are assuming that alter user entity type is 
valid with bootstrap server, revoke the change to the original place and test 
with the command entry point main()



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)

2020-05-06 Thread GitBox


ableegoldman commented on pull request #8616:
URL: https://github.com/apache/kafka/pull/8616#issuecomment-624859671


   @vvcephei  Both Java 11 builds passed, Java 8 failed with flaky 
   `DynamicBrokerReconfigurationTest.testAddRemoveSaslListeners`
   `BranchedMultiLevelRepartitionConnectedTopologyTest.testTopologyBuild`
   
   (the Streams one is not related, it was converted to a unit test to reduce 
flakiness but that PR was not backported to 2.4)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-05-06 Thread GitBox


andrewchoi5 commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-624858200


   @mjsax Hi Matthias, would you happen to know if there were any other 
reviewers available? I don't mind waiting, but was curious what the ETA usually 
appears to be.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #8615: KAFKA-9954: Config command didn't validate the unsupported user config change

2020-05-06 Thread GitBox


d8tltanc commented on pull request #8615:
URL: https://github.com/apache/kafka/pull/8615#issuecomment-624857586


   Thanks for the review. Since we are adding a unit test, move the argument 
checking in checkArgs()



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-06 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101130#comment-17101130
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:


Thanks for the reminder, just opened [this small 
PR|https://github.com/apache/kafka/pull/8626] to add back in the test and 
hopefully clear up the handling of null values. Please take a look when you get 
the chance.

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #8626: KAFKA-9921: explicit handling of null values with retainDuplicates

2020-05-06 Thread GitBox


ableegoldman opened a new pull request #8626:
URL: https://github.com/apache/kafka/pull/8626


   In general the behavior of window stores with `retainDuplicates` is not well 
documented or enforced, so we should attempt to clarify things better in the 
javadocs and in the code itself. This explicitly skips the put/delete when the 
value is null and duplicates are allowed, and specifies this behavior in the 
docs.
   
   Also adds in a test I left out in the earlier PR 
https://github.com/apache/kafka/pull/8564



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8625: MINOR: Only add 'Data' suffix for generated request/response/header types

2020-05-06 Thread GitBox


cmccabe commented on pull request #8625:
URL: https://github.com/apache/kafka/pull/8625#issuecomment-624851509


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-6342) Remove workaround for JSON parsing of non-escaped strings

2020-05-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6342:
---
Summary: Remove workaround for JSON parsing of non-escaped strings  (was: 
Move workaround for JSON parsing of non-escaped strings)

> Remove workaround for JSON parsing of non-escaped strings
> -
>
> Key: KAFKA-6342
> URL: https://issues.apache.org/jira/browse/KAFKA-6342
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Viktor Somogyi-Vass
>Priority: Major
> Fix For: 2.6.0
>
>
> KAFKA-6319 added a workaround to parse invalid JSON persisted using older 
> versions of Kafka because special characters were not escaped. The workaround 
> is required in 1.0.1 to enable parsing invalid JSON from ACL configs in 
> ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for 
> 1.1.0 so that it is applied only to ACLs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6342) Move workaround for JSON parsing of non-escaped strings

2020-05-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-6342:
--

Fix Version/s: 2.6.0
 Assignee: Viktor Somogyi-Vass  (was: Umesh Chaudhary)
   Resolution: Fixed

We found that the compatibility logic had not been used since 1.1, so we 
ultimately decided to remove it.

> Move workaround for JSON parsing of non-escaped strings
> ---
>
> Key: KAFKA-6342
> URL: https://issues.apache.org/jira/browse/KAFKA-6342
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Viktor Somogyi-Vass
>Priority: Major
> Fix For: 2.6.0
>
>
> KAFKA-6319 added a workaround to parse invalid JSON persisted using older 
> versions of Kafka because special characters were not escaped. The workaround 
> is required in 1.0.1 to enable parsing invalid JSON from ACL configs in 
> ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for 
> 1.1.0 so that it is applied only to ACLs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #8602: KAFKA-9947; Ensure proper shutdown of components in `TransactionsBounceTest`

2020-05-06 Thread GitBox


ijuma commented on pull request #8602:
URL: https://github.com/apache/kafka/pull/8602#issuecomment-624845577


   Sounds good.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8585: KAFKA-9938; Debug consumer should be able to fetch from followers

2020-05-06 Thread GitBox


ijuma commented on a change in pull request #8585:
URL: https://github.com/apache/kafka/pull/8585#discussion_r421039469



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -949,8 +949,11 @@ class ReplicaManager(val config: KafkaConfig,
 else
   FetchHighWatermark
 
-// Restrict fetching to leader if request is from follower or from a 
client with older version (no ClientMetadata)
-val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && 
clientMetadata.isEmpty)
+// Restrict fetching to leader if request is from follower or from an 
ordinary consumer
+// with an older version (which is implied by no ClientMetadata)

Review comment:
   Sounds good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-06 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1710#comment-1710
 ] 

Georgi Petkov commented on KAFKA-9921:
--

[~ableegoldman] Did you get to read my last comment? At least the bolded text?

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8602: KAFKA-9947; Ensure proper shutdown of components in `TransactionsBounceTest`

2020-05-06 Thread GitBox


hachikuji commented on pull request #8602:
URL: https://github.com/apache/kafka/pull/8602#issuecomment-624842318


   I changed the patch to set the request timeout the same as the delivery 
timeout for this test case, which was the old behavior.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8230: KAFKA-9667: Connect JSON serde strip trailing zeros

2020-05-06 Thread GitBox


rhauch commented on pull request #8230:
URL: https://github.com/apache/kafka/pull/8230#issuecomment-624837748


   okay to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #8625: MINOR: Only add 'Data' suffix for generated request/response/header types

2020-05-06 Thread GitBox


hachikuji opened a new pull request #8625:
URL: https://github.com/apache/kafka/pull/8625


   Currently we add "Data" to all generated classnames in order to avoid naming 
collisions. Generated classes for other persistent schema definitions (such as 
those used in `GroupCoordinator` and `TransactionCoordinator`) will not 
necessarily have the same problem, so it would be nice if the generated types 
could use the name defined in the schema directly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8585: KAFKA-9938; Debug consumer should be able to fetch from followers

2020-05-06 Thread GitBox


hachikuji commented on a change in pull request #8585:
URL: https://github.com/apache/kafka/pull/8585#discussion_r421026806



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -949,8 +949,11 @@ class ReplicaManager(val config: KafkaConfig,
 else
   FetchHighWatermark
 
-// Restrict fetching to leader if request is from follower or from a 
client with older version (no ClientMetadata)
-val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && 
clientMetadata.isEmpty)
+// Restrict fetching to leader if request is from follower or from an 
ordinary consumer
+// with an older version (which is implied by no ClientMetadata)

Review comment:
   If there are any use cases which rely on it, users can upgrade their 
clients to regain the ability. Since no one has evidently noticed that this 
functionality was lost, I think it's probably fine to remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2020-05-06 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-9112.

Fix Version/s: 2.6.0
 Assignee: Guozhang Wang
   Resolution: Fixed

Think this issue was addressed as part of "The Refactor"

> Combine streams `onAssignment` with `partitionsAssigned` task creation
> --
>
> Key: KAFKA-9112
> URL: https://issues.apache.org/jira/browse/KAFKA-9112
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> Task manager needs to call `createTasks` inside partitionsAssigned callback, 
> which is after the `onAssignment` callback for assignor. This means during 
> task creation we rely on the status change based on the intermediate data 
> structures populated by a different callback, which is hard to reason about. 
> We should consider consolidate logics to either one of the callbacks, prefer 
> `onAssignment` as it contains full information needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6063) StreamsException is thrown after the changing `partitions`

2020-05-06 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-6063.

Resolution: Not A Problem

> StreamsException is thrown after the changing `partitions`
> --
>
> Key: KAFKA-6063
> URL: https://issues.apache.org/jira/browse/KAFKA-6063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: macOS 10.12
> kafka 0.11.0.1
>Reporter: Akihito Nakano
>Priority: Trivial
>  Labels: user-experience
>
> Hi.
> "org.apache.kafka.streams.errors.StreamsException" is thrown in following 
> case.
> h3. Create topic
> {code:java}
> $ bin/kafka-topics.sh --create --zookeeper localhost:2181 
> --replication-factor 1 --partitions 6 --topic word-count-input
> {code}
> h3. Create Kafka Streams Application
> {code:java}
> public class WordCountApp {
> public static void main(String[] args) {
> Properties config = new Properties();
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "wordcount-application");
> ...
> ...
> {code}
> h3.  Ensure that it works fine
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b
> StreamsThread appId: wordcount-application
> ...
> ...
> {code}
> h3.  Change "partitions"
> {code:java}
> $ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 
> --topic word-count-input
> Adding partitions succeeded!
> {code}
> h3.  When I start Application, StreamsException is thrown
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
> StreamsThread appId: wordcount-applicationn
> StreamsThread clientId: 
> wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522
> StreamsThread threadId: 
> wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1
> Active tasks:
> Running:
> Suspended:
> Restoring:
> New:
> Standby tasks:
> Running:
> Suspended:
> Restoring:
> New:
> Exception in thread 
> "wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Could not create internal 
> topics.
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
>   at 
> 

[jira] [Resolved] (KAFKA-8858) Kafka Streams - Failed to Rebalance Error and stream consumer stuck for some reason

2020-05-06 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8858.

  Assignee: Guozhang Wang
Resolution: Duplicate

> Kafka Streams - Failed to Rebalance Error and stream consumer stuck for some 
> reason
> ---
>
> Key: KAFKA-8858
> URL: https://issues.apache.org/jira/browse/KAFKA-8858
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
> Environment: Apache Kafka 2.1.1
>Reporter: Ante B.
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: Stream, consumer, corrupt, offset, rebalance, 
> transactions
>
> I have a basic Kafka Streams application that reads from a {{topic}}, 
> performs a rolling aggregate, and performs a join to publish to an 
> {{agg_topic}}. Our project has the timeout failure in Kafka 2.1.1 env and we 
> don't know the reason yet.
> Our stream consumer stuck for some reason. 
> After we changed our group id to another one it became normal. So seems 
> offset data for this consumer is corrupted.
> Can you help us please to resolve this problem to be able to revert us to the 
> previous consumer name because we have many inconveniences due to this.
> Ping me pls if you will need some additional info.
> Our temporary workaround is to disable the {{exactly_once}} config which 
> skips the initializing transactional state. Also offset reseted for corrupted 
> partition, with no effect.
> Full problem description in log:
> {code:java}
> [2019-08-30 14:20:02.168] [abc-streamer-StreamThread-21] ERROR 
> org.apache.kafka.streams.processor.internals.StreamThread:273 - stream-thread 
> [abc-streamer-StreamThread-21] Error caught during partition assignment, will 
> abort the current process and re-throw at the end of rebalance: {} 
>  org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> initializing transactional state in 6ms.
> [2019-08-30 14:21:35.407] [abc-streamer-StreamThread-14] ERROR 
> org.apache.kafka.streams.processor.internals.StreamThread:273 - stream-thread 
> [abc-streamer-StreamThread-14] Error caught during partition assignment, will 
> abort the current process and re-throw at the end of rebalance: {} 
>  org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> initializing transactional state in 6ms.
> [2019-08-30 14:22:58.487] [abc-streamer-StreamThread-13] ERROR 
> org.apache.kafka.streams.processor.internals.StreamThread:273 - stream-thread 
> [abc-streamer-StreamThread-13] Error caught during partition assignment, will 
> abort the current process and re-throw at the end of rebalance: {} 
>  org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> initializing transactional state in 6ms.
> {noformat}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8207) StickyPartitionAssignor for KStream

2020-05-06 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8207.

Resolution: Not A Problem

Closing this as "Not a Problem" as the partition assignor can't be overriden by 
design. The assignment of tasks to clients should be sticky, in particular it 
should not return the exact same assignment in the case of a simple restart as 
mentioned below.

> StickyPartitionAssignor for KStream
> ---
>
> Key: KAFKA-8207
> URL: https://issues.apache.org/jira/browse/KAFKA-8207
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: neeraj
>Priority: Major
>
> In KStreams I am not able to give a sticky partition assignor or my custom 
> partition assignor.
> Overriding the property while building stream does not work
> streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CustomAssignor.class.getName());
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8360: KAFKA-9768: Fix handling of rest.advertised.listener config

2020-05-06 Thread GitBox


kkonstantine commented on pull request #8360:
URL: https://github.com/apache/kafka/pull/8360#issuecomment-624821936


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420998369



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +858,47 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
   Ive written the test as described, and verified that the exception from 
close is caught by WorkerTask::doRun.
   
   However, it never gets wrapped in a ConnectException like exceptions from 
the other connector methods, but i'm not sure that it's in-scope to change this 
in this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2020-05-06 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-4969.

Fix Version/s: (was: 1.1.0)
   2.6.0
   Resolution: Fixed

This has been resolved as part of KIP-441. The new assignor should distribute 
tasks of each type evenly

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Fix For: 2.6.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
>  - in the case of standby tasks, which tasks have progressed the most with 
> respect to restoration
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.
>  
> There have been some additional discussions around task assignment on a 
> related PR https://github.com/apache/kafka/pull/5390



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421009491



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +858,47 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
   I added that test, so please look at the diff.
   
   By wrapping, i'm completely disregarding the behavior from the connector, 
I'm only discussing the additional layers of exceptions added by the framework 
before printing.
   
   At the moment, the RuntimeException from put is wrapped by a 
ConnectException, but the RuntimeException from close is never wrapped in a 
ConnectException. I'm questioning whether this is the ideal behavior, and 
whether we should add that wrapping layer, or consider it out-of-scope for this 
PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


kkonstantine commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421005156



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +858,47 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
   Let me see if I understand what you are describing as `wrapped`. 
   My use case is as follows: 
   `SinkTask#close` attempts to release resources and if it fails it throws a 
`ConnectException` as we'd expect from connector developers to do (currently it 
throws a `RuntimeException` which might be less representative). 
   
   With your fix this exception can appear as suppressed when an exception 
happens in `SinkTask#put` and that's what your test is guarding against. 
   
   My point is to add the missing test case for when the exception on `close` 
is the only exception that is thrown. There is a variety of ways to do that, 
but I agree with you, this test is not there now. However, I don't think this 
necessarily makes it out of scope. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on pull request #8592: KAFKA-3184: Add Checkpoint for In-memory State Store

2020-05-06 Thread GitBox


nizhikov commented on pull request #8592:
URL: https://github.com/apache/kafka/pull/8592#issuecomment-624815018


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420998369



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +858,47 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
   Ive written the test as-is, and the exception from close is caught by 
WorkerTask::doRun.
   
   However, it never gets wrapped in a ConnectException like exceptions from 
the other connector methods, but i'm not sure that it's in-scope to change this 
in this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #7950: KAFKA-9419: Integer Overflow Possible with CircularIterator

2020-05-06 Thread GitBox


kkonstantine commented on pull request #7950:
URL: https://github.com/apache/kafka/pull/7950#issuecomment-624808716


   Definitely. It's a nice fix. Thanks for contributing it @belugabehr 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] belugabehr commented on pull request #7950: KAFKA-9419: Integer Overflow Possible with CircularIterator

2020-05-06 Thread GitBox


belugabehr commented on pull request #7950:
URL: https://github.com/apache/kafka/pull/7950#issuecomment-624803800


   @kkonstantine I appreciate your help getting this over the finish line



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #7950: KAFKA-9419: Integer Overflow Possible with CircularIterator

2020-05-06 Thread GitBox


kkonstantine commented on pull request #7950:
URL: https://github.com/apache/kafka/pull/7950#issuecomment-624801890


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9633) ConfigProvider.close() not called

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9633:
--
Fix Version/s: 2.4.2
   2.3.2

> ConfigProvider.close() not called
> -
>
> Key: KAFKA-9633
> URL: https://issues.apache.org/jira/browse/KAFKA-9633
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> ConfigProvider extends Closeable, but in the following contexts the 
> {{close()}} method is never called:
> 1. AbstractConfig
> 2. WorkerConfigTransformer



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8239: KAFKA-9666: Don't increase transactional epoch when trying to fence if the log append fails

2020-05-06 Thread GitBox


hachikuji commented on a change in pull request #8239:
URL: https://github.com/apache/kafka/pull/8239#discussion_r420974886



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##
@@ -487,6 +487,33 @@ class TransactionCoordinator(brokerId: Int,
   info(s"Aborting sending of transaction markers and returning 
$error error to client for $transactionalId's EndTransaction request of 
$txnMarkerResult, " +
 s"since appending $newMetadata to transaction log with 
coordinator epoch $coordinatorEpoch failed")
 
+  txnManager.getTransactionState(transactionalId).right.foreach {

Review comment:
   Took me a while to remember this issue... So basically the coordinator 
has decided to abort a transaction and has bumped the epoch. However, when it 
tries to write the updated state to the log, it fails, which leaves us in an 
inconsistent state. Of course failing to write to the log doesn't necessarily 
mean that the entry wasn't appended. In fact, it could still become committed. 
There is no way to take the write back once it gets to the log. Hence I'm a 
little hesitant about the logic to revert to the previous epoch in this case. 
Would it still be possible for the fenced producer to make progress with the 
old epoch after reverting? Perhaps another idea would be to keep the epoch 
bumped in memory, but remember the fact that the write had failed. So the next 
time we go to retry the abort, we do not need to bump the epoch again. Does 
that make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-05-06 Thread GitBox


C0urante commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-624788853


   Thanks for the review, @gharris1727. I've addressed all of your comments 
with either changes or responses; let me know what you think.
   
   @kkonstantine -- there are a few places with nested callback logic that I've 
left untouched for now just to avoid inflating the diff, but apart from those, 
I've replaced construction of anonymous `Runnable` and `Callback` instances 
with lambda syntax in the `StandaloneHerder` and `DistributedHerder` classes. 
Tried to strike a balance here; if we want to go further we always can.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9830) DeadLetterQueueReporter leaks KafkaProducer instance

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9830:
--
Fix Version/s: 2.4.2
   2.3.2

> DeadLetterQueueReporter leaks KafkaProducer instance
> 
>
> Key: KAFKA-9830
> URL: https://issues.apache.org/jira/browse/KAFKA-9830
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> The DeadLetterQueueReporter (introduced by KAFKA-6738) creates a 
> KafkaProducer to report errors to Kafka, but does not clean up the producer, 
> leaving many idle network threads open after tasks are stopped.
> Reproduction steps:
> Start a task that has a non-empty DLQ topic name
> Stop the task
> Observe the list of running threads
> Expected result:
> There is no thread related to the stopped task's DLQ left running
> Actual result:
> There is a thread named something like kafka-producer-network-thread | 
> connector-dlq-producer-task-0" #1234 left running



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9919) Add logging to KafkaBasedLog

2020-05-06 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9919:
--
Fix Version/s: 2.4.2
   2.3.2

> Add logging to KafkaBasedLog
> 
>
> Key: KAFKA-9919
> URL: https://issues.apache.org/jira/browse/KAFKA-9919
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> The logging emitted on startup is a little thin, especially in the case where 
> a worker is having trouble reading to the end of its offset, status, or 
> config topic. We should add some {{TRACE}} and possibly {{DEBUG}} level logs 
> to the {{KafkaBasedLog}} class so that it's clearer when this is happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-05-06 Thread GitBox


C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r420963881



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##
@@ -191,32 +192,61 @@ public synchronized void putConnectorConfig(String 
connName,
 boolean allowReplace,
 final 
Callback> callback) {
 try {
-if (maybeAddConfigErrors(validateConnectorConfig(config), 
callback)) {
+validateConnectorConfig(config, (error, configInfos) -> {
+if (error != null) {
+callback.onCompletion(error, null);
+return;
+}
+
+requestExecutorService.submit(
+() -> putConnectorConfig(connName, config, allowReplace, 
callback, configInfos)
+);
+});
+} catch (Throwable t) {
+callback.onCompletion(t, null);
+}
+}
+
+private synchronized void putConnectorConfig(String connName,
+ final Map 
config,
+ boolean allowReplace,
+ final 
Callback> callback,
+ ConfigInfos configInfos) {
+try {
+if (maybeAddConfigErrors(configInfos, callback)) {
 return;
 }
 
-boolean created = false;
+final boolean created;
 if (configState.contains(connName)) {
 if (!allowReplace) {
 callback.onCompletion(new 
AlreadyExistsException("Connector " + connName + " already exists"), null);
 return;
 }
-worker.stopConnector(connName);
+worker.stopAndAwaitConnector(connName);
+created = false;
 } else {
 created = true;
 }
 
 configBackingStore.putConnectorConfig(connName, config);
 
-if (!startConnector(connName)) {
-callback.onCompletion(new ConnectException("Failed to start 
connector: " + connName), null);
-return;
-}
+// startConnector(connName, onStart);
+startConnector(connName, (error, result) -> {
+if (error != null) {
+callback.onCompletion(error, null);
+return;
+}
 
-updateConnectorTasks(connName);
-callback.onCompletion(null, new Created<>(created, 
createConnectorInfo(connName)));
-} catch (ConnectException e) {
-callback.onCompletion(e, null);
+requestExecutorService.submit(() -> {
+synchronized (this) {

Review comment:
   Ah yeah, done. Could have sworn I was getting SpotBugs complaints at one 
point when I tried that, but it seems to work now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-06 Thread GitBox


gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420950647



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
 @Override
 public void execute() {
 initializeAndStart();
-try {
+// Make sure any uncommitted data has been committed and the task has
+// a chance to clean up its state
+try (QuietClosable ignored = this::closePartitions) {

Review comment:
   I used `ignored` as a way to trigger the IDE to avoid highlighting the 
unused variable.
   I can change this to suppressible since the behavior is better described 
(suppressible "if another exception occurs first").





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9962) Admin client throws UnsupportedVersion exception when talking to old broker

2020-05-06 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101007#comment-17101007
 ] 

Ismael Juma commented on KAFKA-9962:


Good catch. We should update the compatibility system test for the admin client 
to catch this case.

> Admin client throws UnsupportedVersion exception when talking to old broker
> ---
>
> Key: KAFKA-9962
> URL: https://issues.apache.org/jira/browse/KAFKA-9962
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Affects Versions: 2.3.1, 2.5.0, 2.4.1
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Critical
>
> Users are getting this error when using a client version 2.5 against a 1.1.0 
> cluster/broker.
> {code:java}
> [2020-04-28 01:09:10,663] ERROR Failed to start KSQL 
> (io.confluent.ksql.rest.server.KsqlServerMain:63)
> io.confluent.ksql.util.KsqlServerException: Could not get Kafka authorized 
> operations!
> at 
> io.confluent.ksql.services.KafkaClusterUtil.isAuthorizedOperationsSupported(KafkaClusterUtil.java:51)
> at 
> io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:52)
> at 
> io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:639)
> at 
> io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:567)
> at 
> io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:100)
> at 
> io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:59)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
> write a non-default includeClusterAuthorizedOperations at version 5
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> io.confluent.ksql.services.KafkaClusterUtil.isAuthorizedOperationsSupported(KafkaClusterUtil.java:49)
> ... 5 more
> Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: 
> Attempted to write a non-default includeClusterAuthorizedOperations at 
> version 5
> {code}
> Looking at KIP-430, it mentions that the client is supposed to handle this 
> case:
>  # Existing clients using older versions will not request authorized 
> operations in Describe requests since the default is to disable this feature. 
> This keeps older clients compatible with newer brokers.
>  # Newer clients connecting to older brokers will use the older protocol 
> version and hence will not request authorized operations.
>  # When the AdminClient is talking to a broker which does not support 
> KIP-430, it will fill in either null or UnsupportedVersionException for the 
> returned ACL operations fields in objects. For example, 
> `ConsumerGroupDescription#authorizedOperations` will be null if the broker 
> did not supply this information. DescribeClusterResult#authorizedOperations 
> will throw an `UnsupportedVersionException` if the broker did not supply this 
> information.
>  # When new operations are added, newer brokers may return operations that 
> are not known to older clients. AdminClient will ignore any bit that is set 
> in authorized_operations that is not known to the client. The 
> Set created by the client from the bits returned by the broker 
> will only include operations that the client client knows about.
> I assume that this deployment environment falls under case 2, we have this in 
> the serialization code:
> {code:java}
> if (_version >= 8) {
> _writable.writeByte(includeClusterAuthorizedOperations ? (byte) 1 
> : (byte) 0);
> } else {
> if (includeClusterAuthorizedOperations) {
> throw new UnsupportedVersionException("Attempted to write a 
> non-default includeClusterAuthorizedOperations at version " + _version);
> }
> }
> {code}
> I also looks like we blindly set the version independent of the Broker’s 
> supported version:
> {code:java}
> MetadataRequest.Builder createRequest(int timeoutMs) {
> // Since this only requests node information, it's safe to 
> pass true for allowAutoTopicCreation (and it
> // simplifies communication with older brokers)
> return new MetadataRequest.Builder(new MetadataRequestData()
>

[GitHub] [kafka] hachikuji commented on a change in pull request #8602: KAFKA-9947; Ensure proper shutdown of components in `TransactionsBounceTest`

2020-05-06 Thread GitBox


hachikuji commented on a change in pull request #8602:
URL: https://github.com/apache/kafka/pull/8602#discussion_r420948852



##
File path: 
core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
##
@@ -156,17 +152,26 @@ class TransactionsBounceTest extends 
KafkaServerTestHarness {
 
 val expectedValues = (0 until numInputRecords).toSet
 assertEquals(s"Missing messages: ${expectedValues -- recordSet}", 
expectedValues, recordSet)
+  }
 
-verifyingConsumer.close()
+  private def createTransactionalProducer(transactionalId: String) = {
+val props = new Properties()
+props.put(ProducerConfig.ACKS_CONFIG, "all")
+props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512")
+props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+createProducer(configOverrides = props)

Review comment:
   Nice catch. Yeah, let me fix that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >