[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#issuecomment-630618738


   Thank you, @kkonstantine for many nice catches!! 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


kkonstantine commented on pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#issuecomment-630618055


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


kkonstantine commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r427064231



##
File path: docs/ops.html
##
@@ -477,19 +477,25 @@ Limiting 
Bandwidth Usage during Da
   Throttle was removed.
 
   The administrator can also validate the assigned configs using the 
kafka-configs.sh. There are two pairs of throttle
-  configuration used to manage the throttling process. The throttle value 
itself. This is configured, at a broker
+  configuration used to manage the throttling process. First pair refers 
to the throttle value itself. This is configured, at a broker
   level, using the dynamic properties: 
 
-  leader.replication.throttled.rate
-  follower.replication.throttled.rate
+  
+leader.replication.throttled.rate
+follower.replication.throttled.rate
+  
+
+  Then there is the configuration pair of enumerated set of throttled 
replicas: 

Review comment:
   ```suggestion
 Then there is the configuration pair of enumerated sets of throttled 
replicas: 
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r427059479



##
File path: docs/ops.html
##
@@ -477,16 +477,20 @@ Limiting 
Bandwidth Usage during Da
   Throttle was removed.
 
   The administrator can also validate the assigned configs using the 
kafka-configs.sh. There are two pairs of throttle
-  configuration used to manage the throttling process. The throttle value 
itself. This is configured, at a broker
+  configuration used to manage the throttling process. This is configured, 
at a broker
   level, using the dynamic properties: 
 
-  leader.replication.throttled.rate
-  follower.replication.throttled.rate
+  
+leader.replication.throttled.rate
+follower.replication.throttled.rate
+  
 
   There is also an enumerated set of throttled replicas: 

Review comment:
   Good suggestion! after the update, it's more clear!
   
![image](https://user-images.githubusercontent.com/43372967/82292384-6506fb80-99dd-11ea-8e99-15ed86db1186.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r427054507



##
File path: docs/connect.html
##
@@ -129,9 +129,11 @@ TransformationsThe file source connector reads each line as a String. We will wrap 
each line in a Map and then add a second field to identify the origin of the 
event. To do this, we use two transformations:
 
 HoistField to place the input line inside a Map
-InsertField to add the static field. In this example we'll 
indicate that the record came from a file connector

Review comment:
   Yes, you're right! After 2nd reading, it indeed refer to the content 
coming from a file in `InsertField`. I'll revert this change back. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r427051965



##
File path: docs/connect.html
##
@@ -103,7 +103,7 @@ Configuring Connecto
 topics.regex - A Java regular expression of topics to 
use as input for this connector
 
 
-For any other options, you should consult the documentation for the 
connector.
+For any other options, you should consult the documentation for the connector.

Review comment:
   Yes, you're right. I read again, and they indeed refer to the docs of 
each individual connectors. I'll revert this change back. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


kkonstantine commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r427036583



##
File path: docs/connect.html
##
@@ -129,9 +129,11 @@ TransformationsThe file source connector reads each line as a String. We will wrap 
each line in a Map and then add a second field to identify the origin of the 
event. To do this, we use two transformations:
 
 HoistField to place the input line inside a Map
-InsertField to add the static field. In this example we'll 
indicate that the record came from a file connector

Review comment:
   This is also not correct. 
   This refers to what we say above as: _then add a second field to identify 
the origin of the event_
   So this correctly refers to what will be added in this new field when using 
`InsertField`

##
File path: docs/security.html
##
@@ -398,7 +398,7 @@ Host Name Verification
 ssl.keystore.password=test1234
 ssl.key.password=test1234
 
-Other configuration settings that may also be needed depending on 
our requirements and the broker configuration:
+Other configuration settings that may also be needed depending on 
requirements and the broker configuration:

Review comment:
   I'm not sure this is redundant. This refers to the user's requirements. 

##
File path: docs/connect.html
##
@@ -103,7 +103,7 @@ Configuring Connecto
 topics.regex - A Java regular expression of topics to 
use as input for this connector
 
 
-For any other options, you should consult the documentation for the 
connector.
+For any other options, you should consult the documentation for the connector.

Review comment:
   That's not accurate. The documentation for the connector is not the same 
as the Worker configs. 
   This indeed refers to the docs of each individual connector

##
File path: docs/ops.html
##
@@ -477,16 +477,20 @@ Limiting 
Bandwidth Usage during Da
   Throttle was removed.
 
   The administrator can also validate the assigned configs using the 
kafka-configs.sh. There are two pairs of throttle
-  configuration used to manage the throttling process. The throttle value 
itself. This is configured, at a broker
+  configuration used to manage the throttling process. This is configured, 
at a broker

Review comment:
   This is not redundant. It refers to the two pairs of properties shown 
below.  You can say instead: 
   
   ```suggestion
 configuration used to manage the throttling process. First pair refers 
to the throttle value itself. This is configured, at a broker
   ```
   
   then below you could amend the sentence to say: 
   _There is also an enumerated set of throttled replicas:_ -> _Then there is 
the configuration pair of enumerated set of throttled replicas:_

##
File path: docs/ops.html
##
@@ -477,16 +477,20 @@ Limiting 
Bandwidth Usage during Da
   Throttle was removed.
 
   The administrator can also validate the assigned configs using the 
kafka-configs.sh. There are two pairs of throttle
-  configuration used to manage the throttling process. The throttle value 
itself. This is configured, at a broker
+  configuration used to manage the throttling process. This is configured, 
at a broker
   level, using the dynamic properties: 
 
-  leader.replication.throttled.rate
-  follower.replication.throttled.rate
+  
+leader.replication.throttled.rate
+follower.replication.throttled.rate
+  
 
   There is also an enumerated set of throttled replicas: 

Review comment:
   Then here you could amend the sentence to say:
   ```suggestion
 Then there is the configuration pair of throttled replicas: 
   ```

##
File path: docs/ops.html
##
@@ -477,16 +477,20 @@ Limiting 
Bandwidth Usage during Da
   Throttle was removed.
 
   The administrator can also validate the assigned configs using the 
kafka-configs.sh. There are two pairs of throttle
-  configuration used to manage the throttling process. The throttle value 
itself. This is configured, at a broker
+  configuration used to manage the throttling process. This is configured, 
at a broker
   level, using the dynamic properties: 
 
-  leader.replication.throttled.rate
-  follower.replication.throttled.rate
+  
+leader.replication.throttled.rate
+follower.replication.throttled.rate
+  
 
   There is also an enumerated set of throttled replicas: 
 
-  leader.replication.throttled.replicas
-  follower.replication.throttled.replicas
+  
+leader.replication.throttled.replicas
+follower.replication.throttled.replicas
+  
 
   Which are configured per topic. All four config values are automatically 
assigned by kafka-reassign-partitions.sh

Review comment:
   Then you can add a break here: 
   
   ```suggestion
 Which are configured per topic. 
  All four config values are automatically assigned 

[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#issuecomment-630580908


   hi @cmccabe @guozhangwang @gwenshap , a simple doc fix/update to 
connect.html/op.html/security.html. Please help review. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427028165



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   I've also updated in the kafka-site repo: 
https://github.com/apache/kafka-site/pull/265/commits/513a8205e6b115ca4f876aa5d95d3756061266d5.
 Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10020) KIP-616: Rename implicit Serdes instances in kafka-streams-scala

2020-05-18 Thread Yuriy Badalyantc (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Badalyantc updated KAFKA-10020:
-
  Component/s: streams
Affects Version/s: 2.5.0
 Priority: Minor  (was: Major)

> KIP-616: Rename implicit Serdes instances in kafka-streams-scala
> 
>
> Key: KAFKA-10020
> URL: https://issues.apache.org/jira/browse/KAFKA-10020
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Yuriy Badalyantc
>Priority: Minor
>
> To fix name clash, names of implicits in the 
> org.apache.kafka.streams.scala.Serdes should be changed. Details are in the 
> [KIP-616|https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10020) KIP-616: Rename implicit Serdes instances in kafka-streams-scala

2020-05-18 Thread Yuriy Badalyantc (Jira)
Yuriy Badalyantc created KAFKA-10020:


 Summary: KIP-616: Rename implicit Serdes instances in 
kafka-streams-scala
 Key: KAFKA-10020
 URL: https://issues.apache.org/jira/browse/KAFKA-10020
 Project: Kafka
  Issue Type: Improvement
Reporter: Yuriy Badalyantc


To fix name clash, names of implicits in the 
org.apache.kafka.streams.scala.Serdes should be changed. Details are in the 
[KIP-616|https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] LMnet commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes

2020-05-18 Thread GitBox


LMnet commented on a change in pull request #8049:
URL: https://github.com/apache/kafka/pull/8049#discussion_r427015288



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -30,12 +32,15 @@ object Serdes {
   implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long()
   implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
   implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
   implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
   implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
   implicit def Double: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
   implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
   implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
   implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()

Review comment:
   I created the KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10019) MirrorMaker 2 did not function properly after restart (message lost, messages arriving slowly)

2020-05-18 Thread Kay (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kay updated KAFKA-10019:

Description: 
MM2 did not function properly after stopping a running MM2 process then 
starting it again. Consumer did not receive all messages (even messages being 
sent after MM2 restarted).  The messages arriving to the consumer were no 
longer at the rate as specified in "--message" --and "--timeout".

To reproduce the issue
 # Environment:
 ## Region 1: one Kafka cluster, two MM2 instances, 1 producer instance
 ## Region 2: one Kafka cluster, two MM2 instances, 1 consumer instance
 # Producer (in region 1) started sending 1000 messages.
 ## ./bin/kafka-producer-perf-test.sh --producer.config 
config/producer.properties --topic topic1 --record-size 480 --num-records 
1000 --throughput 17
 # Consumer (in region 2) started receiving messages.
 ## while true; do ./bin/kafka-consumer-perf-test.sh --threads 60 *--timeout 
5000* --consumer.config config/consumer.properties --topic region1.topic1 
*--messages 250* --group region2-consume-region1topic1 --broker-list 
$KAFKA_BROKERS; done > consumer.log &
 # Consumer received the first 500 messages (250, 250), as "--message" 
specified.
 # Killed the MM2 process on one of two instances in both regions.
 # Consumer started receiving the remaining messages at a much slower "rate" 
(160, 29, 19, 11, 9, 6, 5, 5, 0,.. 3, 0,... 2, 0,... 1).
 # Restarted the MM2 processes killed at (4).
 # Producer sent another 1000 messages.
 # Still, messages no longer arrived at the "--message" rate (250 * N), but 
e.g. 37, 30, 23, 13, 9, 0, 1, 3...
 # And consumer did not receive all new 1000 messages sent after MM2 restarted. 

Please see the producer and consumer log files attached. In the consumer log 
file, you can see that after the first 2 consecutive "250" messages arrived, 
the message arrived differently.

*Issue Summary*
 # MM2 does not recover from restarting its process.
 # After killing a MM2 process in the MM2 EC2 instance, a Consumer no longer 
received the messages at the rate of "--message" and "--timeout".
 # Consumer did not receive all messages even those messages were published 
after the mm2 process restarted.
 # Consumer no longer received messages at the rate of "--message" and 
"-timeout" even after the mm2 process restarted.

  was:
MM2 did not function properly after stopping a running MM2 process then 
starting it again. Consumer did not receive all messages (even messages being 
sent after MM2 restarted).  The messages arriving to the consumer were no 
longer at the rate as specified in "--message" and "--timeout".

To reproduce the issue
 # Environment:
 ## Region 1: one Kafka cluster, two MM2 instances, 1 producer instance
 ## Region 2: one Kafka cluster, two MM2 instances, 1 consumer instance
 # **Producer (in region 1) started sending 1000 messages.

 ## ./bin/kafka-producer-perf-test.sh --producer.config 
config/producer.properties --topic topic1 --record-size 480 --num-records 
1000 --throughput 17
 # Consumer (in region 2) started receiving messages.
 ## while true; do ./bin/kafka-consumer-perf-test.sh --threads 60 *--timeout 
5000* --consumer.config config/consumer.properties --topic region1.topic1 
*--messages 250* --group region2-consume-region1topic1 --broker-list 
$KAFKA_BROKERS; done > consumer.log &
 # Consumer received the first 500 messages (250, 250), as "--message" 
specified.
 # Killed the MM2 process on one of two instances in both regions.

 # Consumer started receiving the remaining messages at a much slower "rate" 
(160, 29, 19, 11, 9, 6, 5, 5, 0,.. 3, 0,... 2, 0,... 1).

 # Restarted the MM2 processes killed at (4).

 # Producer sent another 1000 messages.

 # Still, messages no longer arrived at the "--message" rate (250 * N), but 
e.g. 37, 30, 23, 13, 9, 0, 1, 3...

 # And consumer did not receive all new 1000 messages sent after MM2 restarted. 

Please see the producer and consumer log files attached.

In the consumer log file, you can see that after the first 2 consecutive "250" 
messages arrived, the message arrived differently.

*Issue Summary*
 # MM2 does not recover from restarting its process.
 # After killing a MM2 process in the MM2 EC2 instance, a Consumer no longer 
received the messages at the rate of "--message" and "--timeout".

 # Consumer did not receive all messages even those messages were published 
after the mm2 process restarted.

 # Consumer no longer received messages at the rate of "--message" and 
"-timeout" even after the mm2 process restarted.


> MirrorMaker 2 did not function properly after restart (message lost, messages 
> arriving slowly)
> --
>
> Key: KAFKA-10019
> URL: https://issues.apache.org/jira/browse/KAFKA-10019
> Project: Kafka
>  Issue Type: Bug
> 

[jira] [Created] (KAFKA-10019) MirrorMaker 2 did not function properly after restart (message lost, messages arriving slowly)

2020-05-18 Thread Kay (Jira)
Kay created KAFKA-10019:
---

 Summary: MirrorMaker 2 did not function properly after restart 
(message lost, messages arriving slowly)
 Key: KAFKA-10019
 URL: https://issues.apache.org/jira/browse/KAFKA-10019
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.4.1
 Environment: Amazon Linux 2
MSK clusters: kafka.m5.large, 3 AZ, 3 brokers
MM2 instances: c5.2xlarge
Producer/Consumer instances: c5.2xlarge
Reporter: Kay
 Attachments: 2a-consumer.log, 2a-producer.log

MM2 did not function properly after stopping a running MM2 process then 
starting it again. Consumer did not receive all messages (even messages being 
sent after MM2 restarted).  The messages arriving to the consumer were no 
longer at the rate as specified in "--message" and "--timeout".

To reproduce the issue
 # Environment:
 ## Region 1: one Kafka cluster, two MM2 instances, 1 producer instance
 ## Region 2: one Kafka cluster, two MM2 instances, 1 consumer instance
 # **Producer (in region 1) started sending 1000 messages.

 ## ./bin/kafka-producer-perf-test.sh --producer.config 
config/producer.properties --topic topic1 --record-size 480 --num-records 
1000 --throughput 17
 # Consumer (in region 2) started receiving messages.
 ## while true; do ./bin/kafka-consumer-perf-test.sh --threads 60 *--timeout 
5000* --consumer.config config/consumer.properties --topic region1.topic1 
*--messages 250* --group region2-consume-region1topic1 --broker-list 
$KAFKA_BROKERS; done > consumer.log &
 # Consumer received the first 500 messages (250, 250), as "--message" 
specified.
 # Killed the MM2 process on one of two instances in both regions.

 # Consumer started receiving the remaining messages at a much slower "rate" 
(160, 29, 19, 11, 9, 6, 5, 5, 0,.. 3, 0,... 2, 0,... 1).

 # Restarted the MM2 processes killed at (4).

 # Producer sent another 1000 messages.

 # Still, messages no longer arrived at the "--message" rate (250 * N), but 
e.g. 37, 30, 23, 13, 9, 0, 1, 3...

 # And consumer did not receive all new 1000 messages sent after MM2 restarted. 

Please see the producer and consumer log files attached.

In the consumer log file, you can see that after the first 2 consecutive "250" 
messages arrived, the message arrived differently.

*Issue Summary*
 # MM2 does not recover from restarting its process.
 # After killing a MM2 process in the MM2 EC2 instance, a Consumer no longer 
received the messages at the rate of "--message" and "--timeout".

 # Consumer did not receive all messages even those messages were published 
after the mm2 process restarted.

 # Consumer no longer received messages at the rate of "--message" and 
"-timeout" even after the mm2 process restarted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427019844



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   Awesome, thank you!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427018759



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   Thanks for suggestion, @ableegoldman , it makes it more clear! I've 
updated in this commit 
https://github.com/apache/kafka/pull/8622/commits/a3accf681e72bbba9a774a464253c1ccd7746188.
 Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2020-05-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110830#comment-17110830
 ] 

Matthias J. Sax commented on KAFKA-6520:


[~guozhang] I think your idea about leveraging KIP-572 might not work. I dug 
though the code and none of the blocking calls that might through a 
`TimeoutException` are on the regular processing code path. Only during task 
initialization or restore, blocking calls are made. During normal processing, 
only `poll()` / `pause()` / `resume()` are called and those methods don't throw 
a `TimeoutException`. Thoughts?

[~VinceMu] Yes, the main purpose is to have a KafkaStreams client state 
DISCONNECTED. Thread state is an internal implementation detail.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] LMnet commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes

2020-05-18 Thread GitBox


LMnet commented on a change in pull request #8049:
URL: https://github.com/apache/kafka/pull/8049#discussion_r427015288



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -30,12 +32,15 @@ object Serdes {
   implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long()
   implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
   implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
   implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
   implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
   implicit def Double: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
   implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
   implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
   implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()

Review comment:
   I created the KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-616+-+Rename+implicit+Serdes+instances+in+kafka-streams-scala





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2020-05-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110829#comment-17110829
 ] 

Matthias J. Sax commented on KAFKA-4327:


Thanks [~jeqo]. The original idea was to move it to `streams` module. Why do 
you want to move it to `tools` module? Or are you referring to move it to 
_package_ {{o.a.k.streams.tools}}?

What parser we are using is an implementation details and does not need to be 
discussed on a KIP. I am fine with your proposal.

However, moving the class to a different module/package and removing zookeeper 
parameter is a breaking change. Thus, we can work on this ticket only for the 
3.0.0 release. It's unclear atm, what the release number after 2.6 will be. 
There was a discussion to maybe do a major release, but as long as the decision 
is no made, I would recommend to hold off. Writing the KIP could be done right 
now, but working on a PR would be too early as we don't know if/when it could 
get merged?

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427013934



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   Can we just add a small qualifier in the first line?
   `you will need to do two rolling bounces` --> `if upgrading from 2.3 or 
below, you will need to do two rolling bounces`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   hi @ableegoldman , After reading the whole paragraph again, I think 
you're right. 
   > (possible values are "0.10.0" - "2.3") and during the second you remove 
it. 
   > This is required to safely upgrade to the new cooperative rebalancing 
protocol of the embedded consumer.   
   > you can safely switch over to cooperative at any time once the entire 
group is on 2.4+ by removing the config value and bouncing.
   
   So, Because we explicitly said since 2.4+, there'll be cooperative 
rebalancing protocol available, I think here we keep it as `2.3` is fine and 
correct. 
   
   Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   hi @ableegoldman , After reading the whole paragraph again, I think 
you're right. 
   > (possible values are "0.10.0" - "2.3") and during the second you remove 
it. 
   > This is required to safely upgrade to the new cooperative rebalancing 
protocol of the embedded consumer.   
   > you can safely switch over to cooperative at any time once the entire 
group is on 2.4+ by removing the config value and bouncing.
   
   So, Because we explicitly said since 2.4+, there'll be cooperative 
rebalancing protocol available, I think here we keep it as `2.3` is fine and 
correct. 
   
   Or do you have any other suggestion? 
   Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   hi @ableegoldman , After reading the whole paragraph again, I think 
you're right. 
   > (possible values are "0.10.0" - "2.3") and during the second you remove 
it. 
   > This is required to safely upgrade to the new cooperative rebalancing 
protocol of the embedded consumer.   
   > you can safely switch over to cooperative at any time once the entire 
group is on 2.4+ by removing the config value and bouncing.
   
   So, I think here we keep it as `2.3` is fine and correct. 
   
   Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   hi @ableegoldman , After reading the whole paragraph again, I think 
you're right. 
   > This is required to safely upgrade to the new cooperative rebalancing 
protocol of the embedded consumer.   
   > you can safely switch over to cooperative at any time once the entire 
group is on 2.4+ by removing the config value and bouncing.
   
   So, I think here we keep it as `2.3` is fine and correct. 
   
   Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427009991



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   @showuon Can we clarify that you only need to do this if you're 
upgrading from 2.3 or below? I know this seems implied by the fact that the 
config's possible values stop at 2.3 but there are always creative 
interpretations of seemingly obvious things 🙂 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427009991



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   @showuon Can we clarify that you only need to do this if you're 
upgrading from 2.3 or below? I know this seems implied by the fact that the 
config's possible values stop at 2.3 but there are always creative 
interpretations of seemingly obvious thing 🙂 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r427009933



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   Thank you, @ableegoldman @abbccdda @bbejeck , I've reverted back this 
version to `2.3` now in this commit 
https://github.com/apache/kafka/pull/8622/commits/460768e71f5c7d427a6faffdded9b0478ade1db1.
 Thank you very much!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r427006280



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 }
 
 if (stores.containsKey(storeName)) {
-throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +

Review comment:
   Re: your concern, I don't think we can assume that a user's state 
store's `init` method is idempotent. AFAIK nothing should change that's 
relevant to the state store registration, but if something does (eg 
TaskCorrupted) we'd have to wipe out everything and start it all again anyways





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r425522820



##
File path: docs/security.html
##
@@ -1438,7 +1438,7 @@ Examples
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic
 By default, all principals that don't have an explicit acl that 
allows access for an operation to a resource are denied. In rare cases where an 
allow acl is defined that allows access to all but some principal we will have 
to use the --deny-principal and --deny-host option. For example, if we want to 
allow all users to Read from Test-topic but only deny User:BadBob from IP 
198.51.100.3 we can do so using following commands:
 bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:* --allow-host * --deny-principal User:BadBob 
--deny-host 198.51.100.3 --operation Read --topic Test-topic
-Note that ``--allow-host`` and ``deny-host`` only support IP 
addresses (hostnames are not supported).
+Note that --allow-host and --deny-host 
only support IP addresses (hostnames are not supported).

Review comment:
   There's no format like ` `` ` in the documentation anywhere else. 
Replace with `` formatting here.
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82280446-793cff80-99c1-11ea-8d64-1d58a51da62a.png)
   
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82002970-ce130a00-9691-11ea-8ffc-8ed41b3a55a4.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r420534565



##
File path: docs/security.html
##
@@ -398,7 +398,7 @@ Host Name Verification
 ssl.keystore.password=test1234
 ssl.key.password=test1234
 
-Other configuration settings that may also be needed depending on 
our requirements and the broker configuration:
+Other configuration settings that may also be needed depending on 
requirements and the broker configuration:

Review comment:
   Remove the redundant `our` here
   
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82280333-40048f80-99c1-11ea-9cb0-be70c54a6745.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations

2020-05-18 Thread GitBox


showuon commented on a change in pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#discussion_r420534465



##
File path: docs/security.html
##
@@ -361,9 +361,9 @@ Host Name Verification
 
 
 The JRE/JDK will have a default pseudo-random number generator 
(PRNG) that is used for cryptography operations, so it is not required to 
configure the
-implementation used with the 
ssl.secure.random.implementation. However, there are performance 
issues with some implementations (notably, the
-default chosen on Linux systems, NativePRNG, utilizes a 
global lock). In cases where performance of SSL connections becomes an issue,
-consider explicitly setting the implementation to be used. The 
SHA1PRNG implementation is non-blocking, and has shown very good 
performance
+implementation used with the 
ssl.secure.random.implementation. However, there are performance 
issues with some implementations (notably, the
+default chosen on Linux systems, NativePRNG, utilizes 
a global lock). In cases where performance of SSL connections becomes an issue,
+consider explicitly setting the implementation to be used. The 
SHA1PRNG implementation is non-blocking, and has shown very good 
performance

Review comment:
   Fix wrong content format.
   
   **Before:**
   
![圖片](https://user-images.githubusercontent.com/43372967/81137668-1fd0db80-8f92-11ea-9502-026dba38b031.png)
   
   **After:**
   
![圖片](https://user-images.githubusercontent.com/43372967/81137626-0039b300-8f92-11ea-818e-15b4aed2270b.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll

2020-05-18 Thread GitBox


abbccdda commented on pull request #8682:
URL: https://github.com/apache/kafka/pull/8682#issuecomment-630551559


   Will attempt Sophie's suggestion here



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r427003560



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 }
 
 if (stores.containsKey(storeName)) {
-throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +

Review comment:
   Nah, I think we should actually keep this (although 
`IllegalStateException` seems to make more sense, can we change it?) -- we 
should just make sure we don't reach it 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Attachment: 166.tgz

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: newbie
> Attachments: 166.tgz
>
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment (non-empty) before 
> kicking off the record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

2020-05-18 Thread GitBox


abbccdda commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r426998545



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 }
 
 if (stores.containsKey(storeName)) {
-throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +

Review comment:
   So are we still required to remove the illegal argument exception here? 
What I'm concerned is that the latest version of state store initialization 
might be different from previous iteration, so it's safer to just go through 
the entire procedure once more.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-9755: Implement read path for feature versioning system (KIP-584)

2020-05-18 Thread GitBox


abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r426884892



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/VersionLevelRange.java
##
@@ -0,0 +1,39 @@
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of VersionRange representing a range of version levels. 
The main specialization
+ * is that the class uses different serialization keys for min/max attributes.
+ *
+ * NOTE: This is the backing class used to define the min/max version levels 
for finalized features.
+ */
+public class VersionLevelRange extends VersionRange {

Review comment:
   In terms of naming, do you think `FinalizedVersionRange` is more 
explicit? Also when I look closer at the class hierarchy, I feel the sharing 
point between finalized version range and supported version range should be 
extracted to avoid weird inheritance. What I'm proposing is to have 
`VersionRange` as a super class with two subclasses: `SupportedVersionRange` 
and `FinalizedVersionRange`, and make `minKeyLabel` and `maxKeyLabel` abstract 
functions, WDYT?

##
File path: 
clients/src/test/java/org/apache/kafka/common/feature/VersionRangeTest.java
##
@@ -0,0 +1,150 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class VersionRangeTest {
+@Test
+public void testFailDueToInvalidParams() {
+// min and max can't be < 1.
+assertThrows(
+IllegalArgumentException.class,
+() -> new VersionRange(0, 0));
+assertThrows(
+IllegalArgumentException.class,
+() -> new VersionRange(-1, -1));
+// min can't be < 1.
+assertThrows(
+IllegalArgumentException.class,
+() -> new VersionRange(0, 1));
+assertThrows(
+IllegalArgumentException.class,
+() -> new VersionRange(-1, 1));
+// max can't be < 1.
+assertThrows(
+IllegalArgumentException.class,
+() -> new VersionRange(1, 0));
+assertThrows(
+IllegalArgumentException.class,
+() -> new VersionRange(1, -1));
+// min can't be > max.
+assertThrows(
+IllegalArgumentException.class,
+() -> new VersionRange(2, 1));
+}
+
+@Test
+public void testSerializeDeserializeTest() {
+VersionRange versionRange = new VersionRange(1, 2);
+assertEquals(1, versionRange.min());
+assertEquals(2, versionRange.max());
+
+Map serialized = versionRange.serialize();
+assertEquals(
+new HashMap() {
+{
+put("min_version", versionRange.min());
+put("max_version", versionRange.max());
+}
+},
+serialized
+);
+
+VersionRange deserialized = VersionRange.deserialize(serialized);
+assertEquals(1, deserialized.min());
+assertEquals(2, deserialized.max());
+assertEquals(versionRange, deserialized);
+}
+
+@Test
+public void testDeserializationFailureTest() {
+// min_version can't be < 1.
+Map invalidWithBadMinVersion = new HashMap() {
+{
+put("min_version", 0L);
+put("max_version", 1L);
+}
+};
+assertThrows(
+IllegalArgumentException.class,
+() -> VersionRange.deserialize(invalidWithBadMinVersion));
+
+// max_version can't be < 1.
+Map invalidWithBadMaxVersion = new HashMap() {
+{
+put("min_version", 1L);
+put("max_version", 0L);
+}
+};
+assertThrows(
+IllegalArgumentException.class,
+() -> VersionRange.deserialize(invalidWithBadMaxVersion));
+
+// min_version and max_version can't be < 1.
+Map invalidWithBadMinMaxVersion = new HashMap() {
+{
+put("min_version", 0L);
+put("max_version", 0L);
+}
+};
+assertThrows(
+IllegalArgumentException.class,
+() -> VersionRange.deserialize(invalidWithBadMinMaxVersion));
+
+// min_version can't be > max_version.
+Map invalidWithLowerMaxVersion = new HashMap() {
+{
+put("min_version", 2L);
+put("max_version", 1L);
+}
+};
+assertThrows(
+IllegalArgumentException.class,
+() -> VersionRange.deserialize(invalidWithLowerMaxVersion));
+
+// min_version key missing.
+Map invalidWithMinKeyMissing = n

[jira] [Commented] (KAFKA-10016) Support For Purge Topic

2020-05-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110772#comment-17110772
 ] 

Matthias J. Sax commented on KAFKA-10016:
-

Isn't this already supported via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient]?

> Support For Purge Topic
> ---
>
> Key: KAFKA-10016
> URL: https://issues.apache.org/jira/browse/KAFKA-10016
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mollitor
>Priority: Major
>
> Some discussions about how to purge a topic.  Please add native support  for 
> this operation.  Is there a "starting offset" for each topic?  Such a vehicle 
> would allow for this value to be easily set with the current offeset and the 
> brokers will skip (and clean) everything before that.
>  
> [https://stackoverflow.com/questions/16284399/purge-kafka-topic]
>  
> {code:none}
> kafka-topics --topic mytopic --purge
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10004) KAFKA-10004: ConfigCommand fails to find default broker configs without ZK

2020-05-18 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-10004:
-
Summary: KAFKA-10004: ConfigCommand fails to find default broker configs 
without ZK  (was: using kafka-configs.sh --describe for brokers will have error 
when querying default broker)

> KAFKA-10004: ConfigCommand fails to find default broker configs without ZK
> --
>
> Key: KAFKA-10004
> URL: https://issues.apache.org/jira/browse/KAFKA-10004
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.6.0
>
>
> When running
> {code:java}
> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 
> --entity-type brokers
>  {code}
> the output will be:
>  Dynamic configs for broker 0 are: 
>  Dynamic configs for broker  are:
>  *The entity name for brokers must be a valid integer broker id, found: 
> *
>  
> The default entity cannot successfully get the configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8677: KAFKA-9999: Make internal topic creation error non-fatal

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8677:
URL: https://github.com/apache/kafka/pull/8677#discussion_r426983346



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 "This can happen if the Kafka cluster is temporary not 
available. " +
 "You can increase admin client config `retries` to be 
resilient against this error.", retries);
 log.error(timeoutAndRetryError);
-throw new StreamsException(timeoutAndRetryError);
+throw new TaskMigratedException("Time out for creating internal 
topics", new TimeoutException(timeoutAndRetryError));

Review comment:
   Actually, I'm not sure we necessarily even _need_ to call on the 
`FallbackPriorTaskAssignor`, we just need to schedule the followup and remove 
the affected tasks from the assignment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe

2020-05-18 Thread GitBox


cmccabe commented on pull request #8675:
URL: https://github.com/apache/kafka/pull/8675#issuecomment-630526017


   @showuon : thanks for the fix!  You might want to fix your git configuration 
since your email  is showing up as `showuon 
<43372967+show...@users.noreply.github.com>`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe

2020-05-18 Thread GitBox


cmccabe merged pull request #8675:
URL: https://github.com/apache/kafka/pull/8675


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8677: KAFKA-9999: Make internal topic creation error non-fatal

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8677:
URL: https://github.com/apache/kafka/pull/8677#discussion_r426981901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 "This can happen if the Kafka cluster is temporary not 
available. " +
 "You can increase admin client config `retries` to be 
resilient against this error.", retries);
 log.error(timeoutAndRetryError);
-throw new StreamsException(timeoutAndRetryError);
+throw new TaskMigratedException("Time out for creating internal 
topics", new TimeoutException(timeoutAndRetryError));

Review comment:
   Good point.. what if we just call on the `FallbackPriorTaskAssignor` 
like we do when `listOffsets` fails, and then remove any tasks that involve 
internal topics we failed to create? And schedule the followup rebalance for 
"immediately" 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8692: KAFKA-10018:Change sh to bash

2020-05-18 Thread GitBox


jiameixie commented on pull request #8692:
URL: https://github.com/apache/kafka/pull/8692#issuecomment-630523741


   @ijuma @abbccdda @mjsax @halorgium @astubbs @alexism @glasser  PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe

2020-05-18 Thread GitBox


cmccabe commented on pull request #8675:
URL: https://github.com/apache/kafka/pull/8675#issuecomment-630523110


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll

2020-05-18 Thread GitBox


ableegoldman edited a comment on pull request #8682:
URL: https://github.com/apache/kafka/pull/8682#issuecomment-630522346


   I also think we should reset/clear the set at the beginning of 
`tryToLockAllNonEmptyTaskDirectories`, so basically we're always dealing with 
the current set of actually-locked tasks and don't need to worry about removing 
them during `handleLostAll` or `handleCorruption/Assignment`, etc



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll

2020-05-18 Thread GitBox


ableegoldman commented on pull request #8682:
URL: https://github.com/apache/kafka/pull/8682#issuecomment-630522346


   I also think we should reset/clear the set at the beginning of 
`tryToLockAllNonEmptyTaskDirectories`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie opened a new pull request #8692: KAFKA-10018:Change sh to bash

2020-05-18 Thread GitBox


jiameixie opened a new pull request #8692:
URL: https://github.com/apache/kafka/pull/8692


   "#!/bin/sh" is used in kafka-server-stop.sh and zookeeper-server-stop.sh. [[ 
is a bash-builtin and used.
   Modern Debian and Ubuntu systems, which symlink sh to dash by default. So 
"[[: not found" will occur.
   Change "#!/bin/sh" into "#!/bin/bash" can avoid this error. Modify and make 
all scripts using bash.
   
   Change-Id: I733c6e31f76d768e71ac0e040a33da8f4bd8f005
   Signed-off-by: Jiamei Xie 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

2020-05-18 Thread GitBox


guozhangwang commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r426975369



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 }
 
 if (stores.containsKey(storeName)) {
-throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +

Review comment:
   +1, we can rely on `storeManager#getStore` inside `StateManagerUtil` to 
check if the store is already registered.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 }
 
 if (stores.containsKey(storeName)) {
-throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +
+"in the previous round", storeName);

Review comment:
   nit: we could make the warn log entry more clear that we did not 
override the registered the store, e.g. "Skipped registering state store {} 
since it has already existed in the state manager, ..."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10018) Change sh to bash

2020-05-18 Thread jiamei xie (Jira)
jiamei xie created KAFKA-10018:
--

 Summary:  Change sh to bash
 Key: KAFKA-10018
 URL: https://issues.apache.org/jira/browse/KAFKA-10018
 Project: Kafka
  Issue Type: Bug
  Components: admin
Reporter: jiamei xie
Assignee: jiamei xie


"#!/bin/sh" is used in kafka-server-stop.sh and zookeeper-server-stop.sh. [[ is 
a bash-builtin and used.
Modern Debian and Ubuntu systems, which symlink sh to dash by default. So 
"[[: not found" will occur.
Change "#!/bin/sh" into "#!/bin/bash" can avoid this error. Modify and make 
all scripts using bash.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8682:
URL: https://github.com/apache/kafka/pull/8682#issuecomment-630515587


   I agree with @ableegoldman here, after the `while (taskIdIterator.hasNext()` 
loop we can see if there are still remaining tasks, and then log an WARN 
similar to the end of `handleRevocation` before clearing them:
   
   ```
   if (!remainingPartitions.isEmpty()) {
   log.warn("The following partitions {} are missing from the task 
partitions. It could potentially " +
"due to race condition of consumer detecting the 
heartbeat failure, or the tasks " +
"have been cleaned up by the handleAssignment 
callback.", remainingPartitions);
   }
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2020-05-18 Thread Vince Mu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110746#comment-17110746
 ] 

Vince Mu commented on KAFKA-6520:
-

[~guozhang] that makes sense. I just want to clarify whether we are adding a 
DISCONNECTED state purely to streamThread or whether we need to add it 
KafkaStreams client as well? If it's the latter, would it make sense to transit 
the client to the DISCONNECTED state when all it's streamThreads are in the 
DISCONNECTED state?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-18 Thread GitBox


hachikuji commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r426944679



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -369,6 +369,31 @@ class GroupCoordinator(val brokerId: Int,
 }
   }
 
+  /**
+   * try to complete produce, fetch and delete requests if the HW of partition 
is incremented. Otherwise, we try to complete
+   * only delayed fetch requests.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should 
NOT hold any group lock
+   * in order to avoid deadlock
+   * @param topicPartitions topic partition and leaderHWIncremented
+   */
+  private[this] def completeDelayedRequests(topicPartitions: 
Map[TopicPartition, Boolean]): Unit =
+topicPartitions.foreach {
+  case (tp, leaderHWIncremented) =>
+if (leaderHWIncremented) 
groupManager.replicaManager.completeDelayedRequests(tp)
+else groupManager.replicaManager.completeDelayedFetchRequests(tp)
+}
+
+  /**
+   * complete the delayed join requests associated to input group keys.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should 
NOT hold any group lock
+   * in order to avoid deadlock
+   * @param groupKeys group keys to complete
+   */
+  private[this] def completeDelayedJoinRequests(groupKeys: Set[GroupKey]): 
Unit =
+groupKeys.foreach(joinPurgatory.checkAndComplete)

Review comment:
   Hmm.. Does the group purgatory suffer from the same deadlock potential? 
If we call `checkAndComplete` for a group "foo," I don't think we would attempt 
completion for any other group.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -970,7 +970,16 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks 
when completing delayed requests so it may
+   *produce deadlock if caller already holds a 
group lock. Hence, caller should pass
+   *false to disable completion and then 
complete the delayed requests after releasing
+   *held group lock
+   */
+  def appendRecordsToLeader(records: MemoryRecords,
+origin: AppendOrigin,
+requiredAcks: Int,
+completeDelayedRequests: Boolean): LogAppendResult 
= {

Review comment:
   Currently we have a somewhat convoluted model where `ReplicaManager` 
creates delayed operations, but we depend on lower level components like 
`Partition` to be aware of them and complete them. This breaks encapsulation. 
   
   Not something we should try to complete in this PR, but as an eventual goal, 
I think we can consider trying to factor delayed operations out of `Partition` 
so that they can be managed by `ReplicaManager` exclusively. If you assume that 
is the end state, then we could drop `completeDelayedRequests` and let 
`ReplicaManager` _always_ be responsible for checking delayed operations after 
appending to the log. 
   
   Other than `ReplicaManager`, the only caller of this method is 
`GroupMetadataManager` which uses it during offset expiration. I think the only 
reason we do this is because we didn't want to waste purgatory space. I don't 
think that's a good enough reason to go outside the normal flow. It would be 
simpler to follow the same path. Potentially we could make the callback an 
`Option` so that we still have a way to avoid polluting the purgatory.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -769,20 +813,25 @@ class GroupCoordinator(val brokerId: Int,
 // on heartbeat response to eventually notify the rebalance in 
progress signal to the consumer
 val member = group.get(memberId)
 completeAndScheduleNextHeartbeatExpiration(group, member)
-groupManager.storeOffsets(group, memberId, offsetMetadata, 
responseCallback)
+partitionsToComplete ++= groupManager.storeOffsets(
+  group = group,
+  consumerId = memberId,
+  offsetMetadata = offsetMetadata,
+  responseCallback = responseCallback,
+  completeDelayedRequests = false)
 
   case CompletingRebalance =>
 // We should not receive a commit request if the group has not 
completed rebalance;
 // but since the consumer's member.id and generation is valid, it 
means it has received
 // the latest group generation information from the JoinResponse.
 // So let's return a REBALANCE_IN_PROGRESS to let consumer handle 
it gracefully.
 responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.RE

[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r426958947



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 }
 
 if (stores.containsKey(storeName)) {
-throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +

Review comment:
   I think we might want to skip the re-registration higher up the call 
stack. In `StateManagerUtil#registerStateStores` we call `store.init` on each 
store which ultimately results in this `registerStore` being called





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-18 Thread GitBox


xiaodongdu commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-630496674


   @xvrl  @rhauch @ijuma  Could you review this PR for KIP-606. Thanks,



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao reassigned KAFKA-9989:
--

Assignee: HaiyuanZhao

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: newbie
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment (non-empty) before 
> kicking off the record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner

2020-05-18 Thread GitBox


ijuma commented on a change in pull request #8690:
URL: https://github.com/apache/kafka/pull/8690#discussion_r426950830



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
##
@@ -65,12 +65,20 @@ public int partition(String topic, Object key, byte[] 
keyBytes, Object value, by
 }
 
 private int nextValue(String topic) {
-AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
-return new AtomicInteger(0);
-});
+AtomicInteger counter = topicCounterMap.
+computeIfAbsent(topic, k -> new AtomicInteger(0));
 return counter.getAndIncrement();
 }
 
+@Override
+public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
+// After onNewBatch is called, we will call partition() again.
+// So 'rewind' the counter for this topic.
+AtomicInteger counter = topicCounterMap.
+computeIfAbsent(topic, k -> new AtomicInteger(0));
+counter.getAndDecrement();

Review comment:
   Is this an issue for third party partitioners as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu opened a new pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-18 Thread GitBox


xiaodongdu opened a new pull request #8691:
URL: https://github.com/apache/kafka/pull/8691


   Implement KIP-606, add metadata context to MetricsReporter:
   Added a new api to MetricsReporter to allow client to expose additional 
metadata fields to reporter plugin. Added an interface MetricsContext to 
encapsulate metadata.
   Deprecated JmexReporter(String prefix) constructor. The prefix will be 
passed to the reporter via MetricsContext.
   Replaced existing usage of JmxReporter with the default ImxReporter and pass 
JMX prefix to MetricsContext using _namespace as key.
   From Kafka broker, populate MetricsContext with: kafka.cluster.id and 
kafka.nroker.id
   From Connect, populate MetricsContext with: connect.kafka.cluster.id, 
connect.group.id
   
   Committer Checklist (excluded from commit message)
Verify design and implementation
Verify test coverage and CI build status
Verify documentation (including upgrade notes)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner

2020-05-18 Thread GitBox


cmccabe opened a new pull request #8690:
URL: https://github.com/apache/kafka/pull/8690


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Description: 
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment (non-empty) before kicking 
off the record processing validation. 

  was:
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment before kicking off the 
record processing validation. 


> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment (non-empty) before 
> kicking off the record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Description: 
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment before kicking off the 
record processing validation. 

  was:
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and skip the record processing validation when the assignment is 
empty. 


> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment before kicking off the 
> record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9989:
--

Assignee: (was: Boyang Chen)

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment before kicking off the 
> record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Labels: newbie  (was: )

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment before kicking off the 
> record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10010) Should make state store registration idempotent

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10010:

Summary: Should make state store registration idempotent  (was: Should 
close standby task for safety during HandleLostAll)

> Should make state store registration idempotent
> ---
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173

2020-05-18 Thread GitBox


cadonna commented on a change in pull request #8689:
URL: https://github.com/apache/kafka/pull/8689#discussion_r426924669



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -170,7 +171,7 @@ public void 
shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamTh
 }
 
 @Test
-public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsWithLessClientsThanTasks() {
+public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks() {

Review comment:
   This name seemed not correct.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173

2020-05-18 Thread GitBox


cadonna commented on pull request #8689:
URL: https://github.com/apache/kafka/pull/8689#issuecomment-630467234


   Call for review: @vvcephei @ableegoldman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173

2020-05-18 Thread GitBox


cadonna opened a new pull request #8689:
URL: https://github.com/apache/kafka/pull/8689


   Unit tests
   - 
shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks()
   - 
shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients()
   - 
shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients()
   verify that bug KAFKA-9173 is fixed with the new 
HighAvailabilityTaskAssignor.
   
   
shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks()
   ensures that tasks are evenly assigned over clients when all overprovisioned 
clients join
   simultaneously.
   
   
shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients()
   ensures that warm-up tasks are assigned to two new clients that join the 
group
   although the assignment is already balanced over stream threads.
   
   
shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients()
   ensures that stateful active tasks are balanced over previous and warmed-up 
client
   although it the previous assignment is balanced over stream threads.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2020-05-18 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110643#comment-17110643
 ] 

Bill Bejeck commented on KAFKA-7271:


[~mjsax] I don't think I'll have time for this one, so I've unassigned myself.

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2020-05-18 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-7271:
--

Assignee: (was: Bill Bejeck)

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang merged pull request #8669:
URL: https://github.com/apache/kafka/pull/8669


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110638#comment-17110638
 ] 

Boyang Chen commented on KAFKA-10010:
-

For more context, the 
[reason|[https://github.com/apache/kafka/pull/8440/files#r407722022]] we have 
to keep the txn commit before handle task corruption, since otherwise under EOS 
beta the stream thread could actually abort other healthy tasks.

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


ableegoldman commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-630448469


   Failed due to flaky 
`EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta` and 
`ConnectorTopicsIntegrationTest.testGetActiveTopics`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110633#comment-17110633
 ] 

Sophie Blee-Goldman edited comment on KAFKA-10017 at 5/18/20, 9:36 PM:
---

With injectError = false:
h3. Stacktrace

java.lang.AssertionError: Did not receive all 10 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <10> but: <5> was less than <10> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)


was (Author: ableegoldman):
h3. Stacktrace

java.lang.AssertionError: Did not receive all 10 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <10> but: <5> was less than <10> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10017:

Description: 
Creating a new ticket for this since the root cause is different than 
https://issues.apache.org/jira/browse/KAFKA-9966

With injectError = true:
h3. Stacktrace

java.lang.AssertionError: Did not receive all 20 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <20> but: <15> was less than <20> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)

  was:
Creating a new ticket for this since the root cause is different than 
https://issues.apache.org/jira/browse/KAFKA-9966
h3. Stacktrace

java.lang.AssertionError: Did not receive all 20 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <20> but: <15> was less than <20> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)


> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005

[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110636#comment-17110636
 ] 

Boyang Chen commented on KAFKA-10010:
-

Had offline discussion with the team, so far some action items:


 # Make the state store registration idempotent to unblock the trunk soak
 # Add a logic to avoid aborting the txn when the task is in initialization 
phase (Get a separate ticket)

 

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10017:

Summary: Flaky Test 
EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta  (was: Flaky 
Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true])

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110633#comment-17110633
 ] 

Sophie Blee-Goldman commented on KAFKA-10017:
-

h3. Stacktrace

java.lang.AssertionError: Did not receive all 10 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <10> but: <5> was less than <10> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)

> Flaky Test 
> EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
> -
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] navina edited a comment on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…

2020-05-18 Thread GitBox


navina edited a comment on pull request #8684:
URL: https://github.com/apache/kafka/pull/8684#issuecomment-630444877


   @ijuma What issues with `intern()` are you referring to? I know that there 
can be a performance hit when there are a lot of intern strings. I believe 
string interning mechanism has been improved in the later versions of java such 
as jdk8 / 9. 
   I would like to understand the concern better before removing the string 
interning showed here.
   Thanks for the quick feedback! 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] navina commented on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…

2020-05-18 Thread GitBox


navina commented on pull request #8684:
URL: https://github.com/apache/kafka/pull/8684#issuecomment-630444877


   @ijuma What issues with intern are you referring to? I know that there can 
be a performance hit when there are a lot of intern strings. I believe string 
interning mechanism has been improved in the later versions of java such as 
jdk8 / 9. 
   I would like to understand the concern better before removing the string 
interning showed here.
   Thanks for the quick feedback! 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9292) KIP-551: Expose disk read and write metrics

2020-05-18 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9292.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> KIP-551: Expose disk read and write metrics
> ---
>
> Key: KAFKA-9292
> URL: https://issues.apache.org/jira/browse/KAFKA-9292
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 2.6.0
>
>
> It's often helpful to know how many bytes Kafka is reading and writing from 
> the disk.  The reason is because when disk access is required, there may be 
> some impact on latency and bandwidth.  We currently don't have a metric that 
> measures this directly.  It would be useful to add one.
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-551%3A+Expose+disk+read+and+write+metrics



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #8569: KIP-551: Expose disk read and write metrics

2020-05-18 Thread GitBox


cmccabe merged pull request #8569:
URL: https://github.com/apache/kafka/pull/8569


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

2020-05-18 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10017:
---

 Summary: Flaky Test 
EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
 Key: KAFKA-10017
 URL: https://issues.apache.org/jira/browse/KAFKA-10017
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman


Creating a new ticket for this since the root cause is different than 
https://issues.apache.org/jira/browse/KAFKA-9966
h3. Stacktrace

java.lang.AssertionError: Did not receive all 20 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <20> but: <15> was less than <20> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2020-05-18 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110609#comment-17110609
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-4327:
-

[~mjsax] [~guozhang] I'd like to help closing this one as things have changed 
since it got created:
 * zookeeper dependency has been removed and
 * zookeeper argument deprecated.

This tool carries a dependency to an argument parser that I'm not sure we would 
like to pull into streams module.

I'd like to propose and agree in the following changes before moving forward:
 * move StreamsResetter to `tools` module
 * translate jopt parser (scala) into argparser (java)
 * remove zookeeper parameter

If we agree on this, I can draft a small KIP to get this done.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-9755: Implement read path for feature versioning system (KIP-584)

2020-05-18 Thread GitBox


abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r426806321



##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param  is the type of version range.
+ */
+public class Features {
+private final Map features;
+
+/**
+ * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+ * static factory functions for instantiation (see below).
+ *
+ * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+ *   for the Features object.
+ */
+private Features(Map features) {
+this.features = features;
+}
+
+/**
+ * @param features   Map of feature name to VersionRange, as the backing 
data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"supported" features.
+ */
+public static Features supportedFeatures(Map features) {
+return new Features(features);
+}
+
+/**
+ * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"finalized" features.
+ */
+public static Features finalizedFeatures(Map features) {
+return new Features(features);
+}
+
+public static Features emptyFinalizedFeatures() {

Review comment:
   Is this function only used in unit test?

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param  is the type of version range.
+ */
+public class Features {
+private final Map features;
+
+/**
+ * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+ * static factory functions for instantiation (see below).
+ *
+ * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+ *   for the Features object.
+ */
+private Features(Map features) {
+this.features = features;
+}
+
+/**
+ * @param features   Map of feature name to VersionRange, as the backing 
data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"supported" features.
+ */
+public static Features supportedFeatures(Map features) {
+return new Features(features);

Review comment:
   Could be simplified as new Features<>

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.

Review comment:
   nit: we could use {@link VersionRangeType} to reference to the classes.

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+

[GitHub] [kafka] hachikuji commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-05-18 Thread GitBox


hachikuji commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r426867775



##
File path: clients/src/main/resources/common/message/ListGroupsRequest.json
##
@@ -20,8 +20,14 @@
   // Version 1 and 2 are the same as version 0.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the States flexible field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
+{ "name": "States", "type": "[]string", "versions": "4+", "tag": 0, 
"taggedVersions": "4+",

Review comment:
   Sorry I missed this from the discussion, but why are we bumping the 
version if we are only adding tagged fields? Is it so that we can detect 
whether the capability is supported? If so, then I wonder why we don't make 
this a regular field.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions {
+
+private Optional> states = Optional.empty();
+
+/**
+ * Only groups in these states will be returned by listConsumerGroups()

Review comment:
   Probably worth adding a comment about broker compatibility with this API.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions {
+
+private Optional> states = Optional.empty();
+
+/**
+ * Only groups in these states will be returned by listConsumerGroups()
+ * If not set, all groups are returned without their states
+ * throw IllegalArgumentException if states is empty
+ */
+public ListConsumerGroupsOptions inStates(Set states) {
+if (states == null || states.isEmpty()) {
+throw new IllegalArgumentException("states should not be null or 
empty");
+}
+this.states = Optional.of(states);
+return this;
+}
+
+/**
+ * All groups with their states will be returned by listConsumerGroups()
+ */
+public ListConsumerGroupsOptions inAnyState() {
+this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class));

Review comment:
   Hmm.. We have an `UNKNOWN` state in `ConsumerGroupState` in case the 
group coordinator adds a new state that the client isn't aware of. Currently 
we're going to pass this through the request, which is a bit odd. Furthermore, 
if the coordinator _does_ add new states, we will be unable to see them using 
this API. I think it might be better to use a `null` list of states in the 
request to indicate that any state is needed.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1397,29 +1398,32 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
-val (error, groups) = groupCoordinator.handleListGroups()
+val listGroupsRequest = request.body[ListGroupsRequest]
+val states = listGroupsRequest.data.states.asScala.toList
+
+def createResponse(throttleMs: Int, groups: List[GroupOverview], error: 
Errors): AbstractResponse = {
+   new ListGroupsResponse(new ListGroupsResponseData()
+.setErrorCode(error.code)
+.setGroups(groups.map { group =>
+val listedGroup = new ListGroupsResponseData.ListedGroup()
+  .setGroupId(group.groupId)
+  .setProtocolType(group.protocolType)
+if (!states.isEmpty)

Review comment:
   Why don't we always return the state? I don't think overhead is a huge 
concern for an api like this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10016) Support For Purge Topic

2020-05-18 Thread David Mollitor (Jira)
David Mollitor created KAFKA-10016:
--

 Summary: Support For Purge Topic
 Key: KAFKA-10016
 URL: https://issues.apache.org/jira/browse/KAFKA-10016
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mollitor


Some discussions about how to purge a topic.  Please add native support  for 
this operation.  Is there a "starting offset" for each topic?  Such a vehicle 
would allow for this value to be easily set with the current offeset and the 
brokers will skip (and clean) everything before that.

 

[https://stackoverflow.com/questions/16284399/purge-kafka-topic]

 
{code:none}
kafka-topics --topic mytopic --purge
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #8687: MINOR: updated MacOS compatibility statement for RocksDB

2020-05-18 Thread GitBox


mjsax merged pull request #8687:
URL: https://github.com/apache/kafka/pull/8687


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8685: KAFKA-10014 Always try to close all channels in Selector#close

2020-05-18 Thread GitBox


ijuma commented on a change in pull request #8685:
URL: https://github.com/apache/kafka/pull/8685#discussion_r426850232



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -363,23 +363,14 @@ public void wakeup() {
 @Override
 public void close() {
 List connections = new ArrayList<>(channels.keySet());
-try {
-for (String id : connections)
-close(id);
-} finally {
-// If there is any exception thrown in close(id), we should still 
be able
-// to close the remaining objects, especially the sensors because 
keeping
-// the sensors may lead to failure to start up the 
ReplicaFetcherThread if
-// the old sensors with the same names has not yet been cleaned up.
-AtomicReference firstException = new 
AtomicReference<>();
-Utils.closeQuietly(nioSelector, "nioSelector", firstException);
-Utils.closeQuietly(sensors, "sensors", firstException);
-Utils.closeQuietly(channelBuilder, "channelBuilder", 
firstException);
-Throwable exception = firstException.get();
-if (exception instanceof RuntimeException && !(exception 
instanceof SecurityException)) {
-throw (RuntimeException) exception;
-}
-
+AtomicReference firstException = new AtomicReference<>();

Review comment:
   Have we considered using `Utils.closeAll` instead of multiple 
`closeQuietly`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8685: KAFKA-10014 Always try to close all channels in Selector#close

2020-05-18 Thread GitBox


ijuma commented on a change in pull request #8685:
URL: https://github.com/apache/kafka/pull/8685#discussion_r426850232



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -363,23 +363,14 @@ public void wakeup() {
 @Override
 public void close() {
 List connections = new ArrayList<>(channels.keySet());
-try {
-for (String id : connections)
-close(id);
-} finally {
-// If there is any exception thrown in close(id), we should still 
be able
-// to close the remaining objects, especially the sensors because 
keeping
-// the sensors may lead to failure to start up the 
ReplicaFetcherThread if
-// the old sensors with the same names has not yet been cleaned up.
-AtomicReference firstException = new 
AtomicReference<>();
-Utils.closeQuietly(nioSelector, "nioSelector", firstException);
-Utils.closeQuietly(sensors, "sensors", firstException);
-Utils.closeQuietly(channelBuilder, "channelBuilder", 
firstException);
-Throwable exception = firstException.get();
-if (exception instanceof RuntimeException && !(exception 
instanceof SecurityException)) {
-throw (RuntimeException) exception;
-}
-
+AtomicReference firstException = new AtomicReference<>();

Review comment:
   Have we considered using `Utils.closeAll`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…

2020-05-18 Thread GitBox


ijuma commented on pull request #8684:
URL: https://github.com/apache/kafka/pull/8684#issuecomment-630391729


   Thanks for the PR. Java's built in string interning mechanism is known to 
have issues. Not sure we want to do that. Maybe we can remove that part of the 
change from this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe

2020-05-18 Thread GitBox


cmccabe commented on pull request #8675:
URL: https://github.com/apache/kafka/pull/8675#issuecomment-630389772


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-18 Thread GitBox


apovzner commented on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-630379942


   It looks like the build couldn't even run tests:
   ```
   15:15:09 ERROR: Error cloning remote repo 'origin'
   ...
   15:15:18 stderr: fatal: Unable to look up github.com (port 9418) (Name or 
service not known)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling

2020-05-18 Thread GitBox


ijuma closed pull request #8688:
URL: https://github.com/apache/kafka/pull/8688


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling

2020-05-18 Thread GitBox


ijuma commented on pull request #8688:
URL: https://github.com/apache/kafka/pull/8688#issuecomment-630376046


   Closing this for now as there may be a better way to achieve this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-630370157


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110522#comment-17110522
 ] 

Sophie Blee-Goldman commented on KAFKA-10010:
-

When I first started looking into the store registration and initialization 
logic for that PR, I remember thinking there was a bug since we would attempt 
to re-register stores if we hit an exception halfway through registration. I 
snooped around and it seemed like there wasn't really a way to hit this bug, 
but I fixed it anyways.

Seems like there actually was a way to hit this bug after all, so nice catch 
[~bchen225242]

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110518#comment-17110518
 ] 

Sophie Blee-Goldman commented on KAFKA-10010:
-

It's possible the active <-> standby task conversion PR would actually fix this 
on the side, as it skips re-registering any store that's already registered. 
I'd like to avoid closing standbys during handleLostAll since this will 
completely clear out any in-memory stores, for example

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling

2020-05-18 Thread GitBox


ijuma opened a new pull request #8688:
URL: https://github.com/apache/kafka/pull/8688


   This is a bit odd in that it's not needed from a semantics perspective,
   but it would make it much easier to distinguish the cost of follower
   fetches versus consumer fetches when profiling.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics

2020-05-18 Thread GitBox


cmccabe commented on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-630352959


   @mumrah :  Good question.  I don't think anyone has looked at Sigar.  I 
guess the question is whether we want to get into the business of doing 
general-purpose node monitoring.  I think many people would say no.  We're 
doing this metric mainly because it's very simple to check, and also very 
impactful for Kafka (starting heavy disk reads often correlates with 
performance tanking).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110508#comment-17110508
 ] 

Sophie Blee-Goldman commented on KAFKA-6579:


No, I looked into it but the scope its nontrivial. I'll unassign it and maybe 
someone from the community can pick it up ^^

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-05-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-6579:
--

Assignee: (was: Sophie Blee-Goldman)

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >