[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations
showuon commented on pull request #8623: URL: https://github.com/apache/kafka/pull/8623#issuecomment-630618738 Thank you, @kkonstantine for many nice catches!! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8623: MINOR: Update the documentations
kkonstantine commented on pull request #8623: URL: https://github.com/apache/kafka/pull/8623#issuecomment-630618055 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8623: MINOR: Update the documentations
kkonstantine commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r427064231 ## File path: docs/ops.html ## @@ -477,19 +477,25 @@ Limiting Bandwidth Usage during Da Throttle was removed. The administrator can also validate the assigned configs using the kafka-configs.sh. There are two pairs of throttle - configuration used to manage the throttling process. The throttle value itself. This is configured, at a broker + configuration used to manage the throttling process. First pair refers to the throttle value itself. This is configured, at a broker level, using the dynamic properties: - leader.replication.throttled.rate - follower.replication.throttled.rate + +leader.replication.throttled.rate +follower.replication.throttled.rate + + + Then there is the configuration pair of enumerated set of throttled replicas: Review comment: ```suggestion Then there is the configuration pair of enumerated sets of throttled replicas: ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations
showuon commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r427059479 ## File path: docs/ops.html ## @@ -477,16 +477,20 @@ Limiting Bandwidth Usage during Da Throttle was removed. The administrator can also validate the assigned configs using the kafka-configs.sh. There are two pairs of throttle - configuration used to manage the throttling process. The throttle value itself. This is configured, at a broker + configuration used to manage the throttling process. This is configured, at a broker level, using the dynamic properties: - leader.replication.throttled.rate - follower.replication.throttled.rate + +leader.replication.throttled.rate +follower.replication.throttled.rate + There is also an enumerated set of throttled replicas: Review comment: Good suggestion! after the update, it's more clear! ![image](https://user-images.githubusercontent.com/43372967/82292384-6506fb80-99dd-11ea-8e99-15ed86db1186.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations
showuon commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r427054507 ## File path: docs/connect.html ## @@ -129,9 +129,11 @@ TransformationsThe file source connector reads each line as a String. We will wrap each line in a Map and then add a second field to identify the origin of the event. To do this, we use two transformations: HoistField to place the input line inside a Map -InsertField to add the static field. In this example we'll indicate that the record came from a file connector Review comment: Yes, you're right! After 2nd reading, it indeed refer to the content coming from a file in `InsertField`. I'll revert this change back. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations
showuon commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r427051965 ## File path: docs/connect.html ## @@ -103,7 +103,7 @@ Configuring Connecto topics.regex - A Java regular expression of topics to use as input for this connector -For any other options, you should consult the documentation for the connector. +For any other options, you should consult the documentation for the connector. Review comment: Yes, you're right. I read again, and they indeed refer to the docs of each individual connectors. I'll revert this change back. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8623: MINOR: Update the documentations
kkonstantine commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r427036583 ## File path: docs/connect.html ## @@ -129,9 +129,11 @@ TransformationsThe file source connector reads each line as a String. We will wrap each line in a Map and then add a second field to identify the origin of the event. To do this, we use two transformations: HoistField to place the input line inside a Map -InsertField to add the static field. In this example we'll indicate that the record came from a file connector Review comment: This is also not correct. This refers to what we say above as: _then add a second field to identify the origin of the event_ So this correctly refers to what will be added in this new field when using `InsertField` ## File path: docs/security.html ## @@ -398,7 +398,7 @@ Host Name Verification ssl.keystore.password=test1234 ssl.key.password=test1234 -Other configuration settings that may also be needed depending on our requirements and the broker configuration: +Other configuration settings that may also be needed depending on requirements and the broker configuration: Review comment: I'm not sure this is redundant. This refers to the user's requirements. ## File path: docs/connect.html ## @@ -103,7 +103,7 @@ Configuring Connecto topics.regex - A Java regular expression of topics to use as input for this connector -For any other options, you should consult the documentation for the connector. +For any other options, you should consult the documentation for the connector. Review comment: That's not accurate. The documentation for the connector is not the same as the Worker configs. This indeed refers to the docs of each individual connector ## File path: docs/ops.html ## @@ -477,16 +477,20 @@ Limiting Bandwidth Usage during Da Throttle was removed. The administrator can also validate the assigned configs using the kafka-configs.sh. There are two pairs of throttle - configuration used to manage the throttling process. The throttle value itself. This is configured, at a broker + configuration used to manage the throttling process. This is configured, at a broker Review comment: This is not redundant. It refers to the two pairs of properties shown below. You can say instead: ```suggestion configuration used to manage the throttling process. First pair refers to the throttle value itself. This is configured, at a broker ``` then below you could amend the sentence to say: _There is also an enumerated set of throttled replicas:_ -> _Then there is the configuration pair of enumerated set of throttled replicas:_ ## File path: docs/ops.html ## @@ -477,16 +477,20 @@ Limiting Bandwidth Usage during Da Throttle was removed. The administrator can also validate the assigned configs using the kafka-configs.sh. There are two pairs of throttle - configuration used to manage the throttling process. The throttle value itself. This is configured, at a broker + configuration used to manage the throttling process. This is configured, at a broker level, using the dynamic properties: - leader.replication.throttled.rate - follower.replication.throttled.rate + +leader.replication.throttled.rate +follower.replication.throttled.rate + There is also an enumerated set of throttled replicas: Review comment: Then here you could amend the sentence to say: ```suggestion Then there is the configuration pair of throttled replicas: ``` ## File path: docs/ops.html ## @@ -477,16 +477,20 @@ Limiting Bandwidth Usage during Da Throttle was removed. The administrator can also validate the assigned configs using the kafka-configs.sh. There are two pairs of throttle - configuration used to manage the throttling process. The throttle value itself. This is configured, at a broker + configuration used to manage the throttling process. This is configured, at a broker level, using the dynamic properties: - leader.replication.throttled.rate - follower.replication.throttled.rate + +leader.replication.throttled.rate +follower.replication.throttled.rate + There is also an enumerated set of throttled replicas: - leader.replication.throttled.replicas - follower.replication.throttled.replicas + +leader.replication.throttled.replicas +follower.replication.throttled.replicas + Which are configured per topic. All four config values are automatically assigned by kafka-reassign-partitions.sh Review comment: Then you can add a break here: ```suggestion Which are configured per topic. All four config values are automatically assigned
[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations
showuon commented on pull request #8623: URL: https://github.com/apache/kafka/pull/8623#issuecomment-630580908 hi @cmccabe @guozhangwang @gwenshap , a simple doc fix/update to connect.html/op.html/security.html. Please help review. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427028165 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: I've also updated in the kafka-site repo: https://github.com/apache/kafka-site/pull/265/commits/513a8205e6b115ca4f876aa5d95d3756061266d5. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10020) KIP-616: Rename implicit Serdes instances in kafka-streams-scala
[ https://issues.apache.org/jira/browse/KAFKA-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuriy Badalyantc updated KAFKA-10020: - Component/s: streams Affects Version/s: 2.5.0 Priority: Minor (was: Major) > KIP-616: Rename implicit Serdes instances in kafka-streams-scala > > > Key: KAFKA-10020 > URL: https://issues.apache.org/jira/browse/KAFKA-10020 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Yuriy Badalyantc >Priority: Minor > > To fix name clash, names of implicits in the > org.apache.kafka.streams.scala.Serdes should be changed. Details are in the > [KIP-616|https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10020) KIP-616: Rename implicit Serdes instances in kafka-streams-scala
Yuriy Badalyantc created KAFKA-10020: Summary: KIP-616: Rename implicit Serdes instances in kafka-streams-scala Key: KAFKA-10020 URL: https://issues.apache.org/jira/browse/KAFKA-10020 Project: Kafka Issue Type: Improvement Reporter: Yuriy Badalyantc To fix name clash, names of implicits in the org.apache.kafka.streams.scala.Serdes should be changed. Details are in the [KIP-616|https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] LMnet commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes
LMnet commented on a change in pull request #8049: URL: https://github.com/apache/kafka/pull/8049#discussion_r427015288 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -30,12 +32,15 @@ object Serdes { implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long() implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray() implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes() + implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer() + implicit def shortSerde: Serde[Short] = JSerdes.Short().asInstanceOf[Serde[Short]] implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]] implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float() implicit def Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]] implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double() implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() + implicit def uuidSerde: Serde[UUID] = JSerdes.UUID() Review comment: I created the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10019) MirrorMaker 2 did not function properly after restart (message lost, messages arriving slowly)
[ https://issues.apache.org/jira/browse/KAFKA-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kay updated KAFKA-10019: Description: MM2 did not function properly after stopping a running MM2 process then starting it again. Consumer did not receive all messages (even messages being sent after MM2 restarted). The messages arriving to the consumer were no longer at the rate as specified in "--message" --and "--timeout". To reproduce the issue # Environment: ## Region 1: one Kafka cluster, two MM2 instances, 1 producer instance ## Region 2: one Kafka cluster, two MM2 instances, 1 consumer instance # Producer (in region 1) started sending 1000 messages. ## ./bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --topic topic1 --record-size 480 --num-records 1000 --throughput 17 # Consumer (in region 2) started receiving messages. ## while true; do ./bin/kafka-consumer-perf-test.sh --threads 60 *--timeout 5000* --consumer.config config/consumer.properties --topic region1.topic1 *--messages 250* --group region2-consume-region1topic1 --broker-list $KAFKA_BROKERS; done > consumer.log & # Consumer received the first 500 messages (250, 250), as "--message" specified. # Killed the MM2 process on one of two instances in both regions. # Consumer started receiving the remaining messages at a much slower "rate" (160, 29, 19, 11, 9, 6, 5, 5, 0,.. 3, 0,... 2, 0,... 1). # Restarted the MM2 processes killed at (4). # Producer sent another 1000 messages. # Still, messages no longer arrived at the "--message" rate (250 * N), but e.g. 37, 30, 23, 13, 9, 0, 1, 3... # And consumer did not receive all new 1000 messages sent after MM2 restarted. Please see the producer and consumer log files attached. In the consumer log file, you can see that after the first 2 consecutive "250" messages arrived, the message arrived differently. *Issue Summary* # MM2 does not recover from restarting its process. # After killing a MM2 process in the MM2 EC2 instance, a Consumer no longer received the messages at the rate of "--message" and "--timeout". # Consumer did not receive all messages even those messages were published after the mm2 process restarted. # Consumer no longer received messages at the rate of "--message" and "-timeout" even after the mm2 process restarted. was: MM2 did not function properly after stopping a running MM2 process then starting it again. Consumer did not receive all messages (even messages being sent after MM2 restarted). The messages arriving to the consumer were no longer at the rate as specified in "--message" and "--timeout". To reproduce the issue # Environment: ## Region 1: one Kafka cluster, two MM2 instances, 1 producer instance ## Region 2: one Kafka cluster, two MM2 instances, 1 consumer instance # **Producer (in region 1) started sending 1000 messages. ## ./bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --topic topic1 --record-size 480 --num-records 1000 --throughput 17 # Consumer (in region 2) started receiving messages. ## while true; do ./bin/kafka-consumer-perf-test.sh --threads 60 *--timeout 5000* --consumer.config config/consumer.properties --topic region1.topic1 *--messages 250* --group region2-consume-region1topic1 --broker-list $KAFKA_BROKERS; done > consumer.log & # Consumer received the first 500 messages (250, 250), as "--message" specified. # Killed the MM2 process on one of two instances in both regions. # Consumer started receiving the remaining messages at a much slower "rate" (160, 29, 19, 11, 9, 6, 5, 5, 0,.. 3, 0,... 2, 0,... 1). # Restarted the MM2 processes killed at (4). # Producer sent another 1000 messages. # Still, messages no longer arrived at the "--message" rate (250 * N), but e.g. 37, 30, 23, 13, 9, 0, 1, 3... # And consumer did not receive all new 1000 messages sent after MM2 restarted. Please see the producer and consumer log files attached. In the consumer log file, you can see that after the first 2 consecutive "250" messages arrived, the message arrived differently. *Issue Summary* # MM2 does not recover from restarting its process. # After killing a MM2 process in the MM2 EC2 instance, a Consumer no longer received the messages at the rate of "--message" and "--timeout". # Consumer did not receive all messages even those messages were published after the mm2 process restarted. # Consumer no longer received messages at the rate of "--message" and "-timeout" even after the mm2 process restarted. > MirrorMaker 2 did not function properly after restart (message lost, messages > arriving slowly) > -- > > Key: KAFKA-10019 > URL: https://issues.apache.org/jira/browse/KAFKA-10019 > Project: Kafka > Issue Type: Bug >
[jira] [Created] (KAFKA-10019) MirrorMaker 2 did not function properly after restart (message lost, messages arriving slowly)
Kay created KAFKA-10019: --- Summary: MirrorMaker 2 did not function properly after restart (message lost, messages arriving slowly) Key: KAFKA-10019 URL: https://issues.apache.org/jira/browse/KAFKA-10019 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.4.1 Environment: Amazon Linux 2 MSK clusters: kafka.m5.large, 3 AZ, 3 brokers MM2 instances: c5.2xlarge Producer/Consumer instances: c5.2xlarge Reporter: Kay Attachments: 2a-consumer.log, 2a-producer.log MM2 did not function properly after stopping a running MM2 process then starting it again. Consumer did not receive all messages (even messages being sent after MM2 restarted). The messages arriving to the consumer were no longer at the rate as specified in "--message" and "--timeout". To reproduce the issue # Environment: ## Region 1: one Kafka cluster, two MM2 instances, 1 producer instance ## Region 2: one Kafka cluster, two MM2 instances, 1 consumer instance # **Producer (in region 1) started sending 1000 messages. ## ./bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --topic topic1 --record-size 480 --num-records 1000 --throughput 17 # Consumer (in region 2) started receiving messages. ## while true; do ./bin/kafka-consumer-perf-test.sh --threads 60 *--timeout 5000* --consumer.config config/consumer.properties --topic region1.topic1 *--messages 250* --group region2-consume-region1topic1 --broker-list $KAFKA_BROKERS; done > consumer.log & # Consumer received the first 500 messages (250, 250), as "--message" specified. # Killed the MM2 process on one of two instances in both regions. # Consumer started receiving the remaining messages at a much slower "rate" (160, 29, 19, 11, 9, 6, 5, 5, 0,.. 3, 0,... 2, 0,... 1). # Restarted the MM2 processes killed at (4). # Producer sent another 1000 messages. # Still, messages no longer arrived at the "--message" rate (250 * N), but e.g. 37, 30, 23, 13, 9, 0, 1, 3... # And consumer did not receive all new 1000 messages sent after MM2 restarted. Please see the producer and consumer log files attached. In the consumer log file, you can see that after the first 2 consecutive "250" messages arrived, the message arrived differently. *Issue Summary* # MM2 does not recover from restarting its process. # After killing a MM2 process in the MM2 EC2 instance, a Consumer no longer received the messages at the rate of "--message" and "--timeout". # Consumer did not receive all messages even those messages were published after the mm2 process restarted. # Consumer no longer received messages at the rate of "--message" and "-timeout" even after the mm2 process restarted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation
ableegoldman commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427019844 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: Awesome, thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427018759 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: Thanks for suggestion, @ableegoldman , it makes it more clear! I've updated in this commit https://github.com/apache/kafka/pull/8622/commits/a3accf681e72bbba9a774a464253c1ccd7746188. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110830#comment-17110830 ] Matthias J. Sax commented on KAFKA-6520: [~guozhang] I think your idea about leveraging KIP-572 might not work. I dug though the code and none of the blocking calls that might through a `TimeoutException` are on the regular processing code path. Only during task initialization or restore, blocking calls are made. During normal processing, only `poll()` / `pause()` / `resume()` are called and those methods don't throw a `TimeoutException`. Thoughts? [~VinceMu] Yes, the main purpose is to have a KafkaStreams client state DISCONNECTED. Thread state is an internal implementation detail. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] LMnet commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes
LMnet commented on a change in pull request #8049: URL: https://github.com/apache/kafka/pull/8049#discussion_r427015288 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -30,12 +32,15 @@ object Serdes { implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long() implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray() implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes() + implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer() + implicit def shortSerde: Serde[Short] = JSerdes.Short().asInstanceOf[Serde[Short]] implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]] implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float() implicit def Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]] implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double() implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() + implicit def uuidSerde: Serde[UUID] = JSerdes.UUID() Review comment: I created the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-616+-+Rename+implicit+Serdes+instances+in+kafka-streams-scala This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110829#comment-17110829 ] Matthias J. Sax commented on KAFKA-4327: Thanks [~jeqo]. The original idea was to move it to `streams` module. Why do you want to move it to `tools` module? Or are you referring to move it to _package_ {{o.a.k.streams.tools}}? What parser we are using is an implementation details and does not need to be discussed on a KIP. I am fine with your proposal. However, moving the class to a different module/package and removing zookeeper parameter is a breaking change. Thus, we can work on this ticket only for the 3.0.0 release. It's unclear atm, what the release number after 2.6 will be. There was a discussion to maybe do a major release, but as long as the decision is no made, I would recommend to hold off. Writing the KIP could be done right now, but working on a PR would be too early as we don't know if/when it could get merged? > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Esteban Quilcate Otoya >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation
ableegoldman commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427013934 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: Can we just add a small qualifier in the first line? `you will need to do two rolling bounces` --> `if upgrading from 2.3 or below, you will need to do two rolling bounces` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: hi @ableegoldman , After reading the whole paragraph again, I think you're right. > (possible values are "0.10.0" - "2.3") and during the second you remove it. > This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. > you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. So, Because we explicitly said since 2.4+, there'll be cooperative rebalancing protocol available, I think here we keep it as `2.3` is fine and correct. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: hi @ableegoldman , After reading the whole paragraph again, I think you're right. > (possible values are "0.10.0" - "2.3") and during the second you remove it. > This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. > you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. So, Because we explicitly said since 2.4+, there'll be cooperative rebalancing protocol available, I think here we keep it as `2.3` is fine and correct. Or do you have any other suggestion? Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: hi @ableegoldman , After reading the whole paragraph again, I think you're right. > (possible values are "0.10.0" - "2.3") and during the second you remove it. > This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. > you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. So, I think here we keep it as `2.3` is fine and correct. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427012797 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: hi @ableegoldman , After reading the whole paragraph again, I think you're right. > This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. > you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. So, I think here we keep it as `2.3` is fine and correct. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation
ableegoldman commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427009991 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: @showuon Can we clarify that you only need to do this if you're upgrading from 2.3 or below? I know this seems implied by the fact that the config's possible values stop at 2.3 but there are always creative interpretations of seemingly obvious things 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation
ableegoldman commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427009991 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: @showuon Can we clarify that you only need to do this if you're upgrading from 2.3 or below? I know this seems implied by the fact that the config's possible values stop at 2.3 but there are always creative interpretations of seemingly obvious thing 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r427009933 ## File path: docs/streams/upgrade-guide.html ## @@ -35,7 +35,7 @@ Upgrade Guide and API Changes Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +(possible values are "0.10.0" - "2.4") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager Review comment: Thank you, @ableegoldman @abbccdda @bbejeck , I've reverted back this version to `2.3` now in this commit https://github.com/apache/kafka/pull/8622/commits/460768e71f5c7d427a6faffdded9b0478ade1db1. Thank you very much! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent
ableegoldman commented on a change in pull request #8681: URL: https://github.com/apache/kafka/pull/8681#discussion_r427006280 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta } if (stores.containsKey(storeName)) { -throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); +log.warn("State Store {} has already been registered, which could be due to a half-way registration" + Review comment: Re: your concern, I don't think we can assume that a user's state store's `init` method is idempotent. AFAIK nothing should change that's relevant to the state store registration, but if something does (eg TaskCorrupted) we'd have to wipe out everything and start it all again anyways This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations
showuon commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r425522820 ## File path: docs/security.html ## @@ -1438,7 +1438,7 @@ Examples bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic By default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands: bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic -Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported). +Note that --allow-host and --deny-host only support IP addresses (hostnames are not supported). Review comment: There's no format like ` `` ` in the documentation anywhere else. Replace with `` formatting here. before: ![image](https://user-images.githubusercontent.com/43372967/82280446-793cff80-99c1-11ea-8d64-1d58a51da62a.png) after: ![image](https://user-images.githubusercontent.com/43372967/82002970-ce130a00-9691-11ea-8ffc-8ed41b3a55a4.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations
showuon commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r420534565 ## File path: docs/security.html ## @@ -398,7 +398,7 @@ Host Name Verification ssl.keystore.password=test1234 ssl.key.password=test1234 -Other configuration settings that may also be needed depending on our requirements and the broker configuration: +Other configuration settings that may also be needed depending on requirements and the broker configuration: Review comment: Remove the redundant `our` here before: ![image](https://user-images.githubusercontent.com/43372967/82280333-40048f80-99c1-11ea-9cb0-be70c54a6745.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8623: MINOR: Update the documentations
showuon commented on a change in pull request #8623: URL: https://github.com/apache/kafka/pull/8623#discussion_r420534465 ## File path: docs/security.html ## @@ -361,9 +361,9 @@ Host Name Verification The JRE/JDK will have a default pseudo-random number generator (PRNG) that is used for cryptography operations, so it is not required to configure the -implementation used with the ssl.secure.random.implementation. However, there are performance issues with some implementations (notably, the -default chosen on Linux systems, NativePRNG, utilizes a global lock). In cases where performance of SSL connections becomes an issue, -consider explicitly setting the implementation to be used. The SHA1PRNG implementation is non-blocking, and has shown very good performance +implementation used with the ssl.secure.random.implementation. However, there are performance issues with some implementations (notably, the +default chosen on Linux systems, NativePRNG, utilizes a global lock). In cases where performance of SSL connections becomes an issue, +consider explicitly setting the implementation to be used. The SHA1PRNG implementation is non-blocking, and has shown very good performance Review comment: Fix wrong content format. **Before:** ![圖片](https://user-images.githubusercontent.com/43372967/81137668-1fd0db80-8f92-11ea-9502-026dba38b031.png) **After:** ![圖片](https://user-images.githubusercontent.com/43372967/81137626-0039b300-8f92-11ea-818e-15b4aed2270b.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll
abbccdda commented on pull request #8682: URL: https://github.com/apache/kafka/pull/8682#issuecomment-630551559 Will attempt Sophie's suggestion here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent
ableegoldman commented on a change in pull request #8681: URL: https://github.com/apache/kafka/pull/8681#discussion_r427003560 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta } if (stores.containsKey(storeName)) { -throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); +log.warn("State Store {} has already been registered, which could be due to a half-way registration" + Review comment: Nah, I think we should actually keep this (although `IllegalStateException` seems to make more sense, can we change it?) -- we should just make sure we don't reach it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9989: --- Attachment: 166.tgz > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > Labels: newbie > Attachments: 166.tgz > > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment (non-empty) before > kicking off the record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent
abbccdda commented on a change in pull request #8681: URL: https://github.com/apache/kafka/pull/8681#discussion_r426998545 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta } if (stores.containsKey(storeName)) { -throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); +log.warn("State Store {} has already been registered, which could be due to a half-way registration" + Review comment: So are we still required to remove the illegal argument exception here? What I'm concerned is that the latest version of state store initialization might be different from previous iteration, so it's safer to just go through the entire procedure once more. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-9755: Implement read path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r426884892 ## File path: clients/src/main/java/org/apache/kafka/common/feature/VersionLevelRange.java ## @@ -0,0 +1,39 @@ +package org.apache.kafka.common.feature; + +import java.util.Map; + +/** + * A specialization of VersionRange representing a range of version levels. The main specialization + * is that the class uses different serialization keys for min/max attributes. + * + * NOTE: This is the backing class used to define the min/max version levels for finalized features. + */ +public class VersionLevelRange extends VersionRange { Review comment: In terms of naming, do you think `FinalizedVersionRange` is more explicit? Also when I look closer at the class hierarchy, I feel the sharing point between finalized version range and supported version range should be extracted to avoid weird inheritance. What I'm proposing is to have `VersionRange` as a super class with two subclasses: `SupportedVersionRange` and `FinalizedVersionRange`, and make `minKeyLabel` and `maxKeyLabel` abstract functions, WDYT? ## File path: clients/src/test/java/org/apache/kafka/common/feature/VersionRangeTest.java ## @@ -0,0 +1,150 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class VersionRangeTest { +@Test +public void testFailDueToInvalidParams() { +// min and max can't be < 1. +assertThrows( +IllegalArgumentException.class, +() -> new VersionRange(0, 0)); +assertThrows( +IllegalArgumentException.class, +() -> new VersionRange(-1, -1)); +// min can't be < 1. +assertThrows( +IllegalArgumentException.class, +() -> new VersionRange(0, 1)); +assertThrows( +IllegalArgumentException.class, +() -> new VersionRange(-1, 1)); +// max can't be < 1. +assertThrows( +IllegalArgumentException.class, +() -> new VersionRange(1, 0)); +assertThrows( +IllegalArgumentException.class, +() -> new VersionRange(1, -1)); +// min can't be > max. +assertThrows( +IllegalArgumentException.class, +() -> new VersionRange(2, 1)); +} + +@Test +public void testSerializeDeserializeTest() { +VersionRange versionRange = new VersionRange(1, 2); +assertEquals(1, versionRange.min()); +assertEquals(2, versionRange.max()); + +Map serialized = versionRange.serialize(); +assertEquals( +new HashMap() { +{ +put("min_version", versionRange.min()); +put("max_version", versionRange.max()); +} +}, +serialized +); + +VersionRange deserialized = VersionRange.deserialize(serialized); +assertEquals(1, deserialized.min()); +assertEquals(2, deserialized.max()); +assertEquals(versionRange, deserialized); +} + +@Test +public void testDeserializationFailureTest() { +// min_version can't be < 1. +Map invalidWithBadMinVersion = new HashMap() { +{ +put("min_version", 0L); +put("max_version", 1L); +} +}; +assertThrows( +IllegalArgumentException.class, +() -> VersionRange.deserialize(invalidWithBadMinVersion)); + +// max_version can't be < 1. +Map invalidWithBadMaxVersion = new HashMap() { +{ +put("min_version", 1L); +put("max_version", 0L); +} +}; +assertThrows( +IllegalArgumentException.class, +() -> VersionRange.deserialize(invalidWithBadMaxVersion)); + +// min_version and max_version can't be < 1. +Map invalidWithBadMinMaxVersion = new HashMap() { +{ +put("min_version", 0L); +put("max_version", 0L); +} +}; +assertThrows( +IllegalArgumentException.class, +() -> VersionRange.deserialize(invalidWithBadMinMaxVersion)); + +// min_version can't be > max_version. +Map invalidWithLowerMaxVersion = new HashMap() { +{ +put("min_version", 2L); +put("max_version", 1L); +} +}; +assertThrows( +IllegalArgumentException.class, +() -> VersionRange.deserialize(invalidWithLowerMaxVersion)); + +// min_version key missing. +Map invalidWithMinKeyMissing = n
[jira] [Commented] (KAFKA-10016) Support For Purge Topic
[ https://issues.apache.org/jira/browse/KAFKA-10016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110772#comment-17110772 ] Matthias J. Sax commented on KAFKA-10016: - Isn't this already supported via [https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient]? > Support For Purge Topic > --- > > Key: KAFKA-10016 > URL: https://issues.apache.org/jira/browse/KAFKA-10016 > Project: Kafka > Issue Type: Improvement >Reporter: David Mollitor >Priority: Major > > Some discussions about how to purge a topic. Please add native support for > this operation. Is there a "starting offset" for each topic? Such a vehicle > would allow for this value to be easily set with the current offeset and the > brokers will skip (and clean) everything before that. > > [https://stackoverflow.com/questions/16284399/purge-kafka-topic] > > {code:none} > kafka-topics --topic mytopic --purge > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10004) KAFKA-10004: ConfigCommand fails to find default broker configs without ZK
[ https://issues.apache.org/jira/browse/KAFKA-10004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-10004: - Summary: KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (was: using kafka-configs.sh --describe for brokers will have error when querying default broker) > KAFKA-10004: ConfigCommand fails to find default broker configs without ZK > -- > > Key: KAFKA-10004 > URL: https://issues.apache.org/jira/browse/KAFKA-10004 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 2.6.0 > > > When running > {code:java} > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 > --entity-type brokers > {code} > the output will be: > Dynamic configs for broker 0 are: > Dynamic configs for broker are: > *The entity name for brokers must be a valid integer broker id, found: > * > > The default entity cannot successfully get the configs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8677: KAFKA-9999: Make internal topic creation error non-fatal
ableegoldman commented on a change in pull request #8677: URL: https://github.com/apache/kafka/pull/8677#discussion_r426983346 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams "This can happen if the Kafka cluster is temporary not available. " + "You can increase admin client config `retries` to be resilient against this error.", retries); log.error(timeoutAndRetryError); -throw new StreamsException(timeoutAndRetryError); +throw new TaskMigratedException("Time out for creating internal topics", new TimeoutException(timeoutAndRetryError)); Review comment: Actually, I'm not sure we necessarily even _need_ to call on the `FallbackPriorTaskAssignor`, we just need to schedule the followup and remove the affected tasks from the assignment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe
cmccabe commented on pull request #8675: URL: https://github.com/apache/kafka/pull/8675#issuecomment-630526017 @showuon : thanks for the fix! You might want to fix your git configuration since your email is showing up as `showuon <43372967+show...@users.noreply.github.com>` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe
cmccabe merged pull request #8675: URL: https://github.com/apache/kafka/pull/8675 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8677: KAFKA-9999: Make internal topic creation error non-fatal
ableegoldman commented on a change in pull request #8677: URL: https://github.com/apache/kafka/pull/8677#discussion_r426981901 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams "This can happen if the Kafka cluster is temporary not available. " + "You can increase admin client config `retries` to be resilient against this error.", retries); log.error(timeoutAndRetryError); -throw new StreamsException(timeoutAndRetryError); +throw new TaskMigratedException("Time out for creating internal topics", new TimeoutException(timeoutAndRetryError)); Review comment: Good point.. what if we just call on the `FallbackPriorTaskAssignor` like we do when `listOffsets` fails, and then remove any tasks that involve internal topics we failed to create? And schedule the followup rebalance for "immediately" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on pull request #8692: KAFKA-10018:Change sh to bash
jiameixie commented on pull request #8692: URL: https://github.com/apache/kafka/pull/8692#issuecomment-630523741 @ijuma @abbccdda @mjsax @halorgium @astubbs @alexism @glasser PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe
cmccabe commented on pull request #8675: URL: https://github.com/apache/kafka/pull/8675#issuecomment-630523110 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll
ableegoldman edited a comment on pull request #8682: URL: https://github.com/apache/kafka/pull/8682#issuecomment-630522346 I also think we should reset/clear the set at the beginning of `tryToLockAllNonEmptyTaskDirectories`, so basically we're always dealing with the current set of actually-locked tasks and don't need to worry about removing them during `handleLostAll` or `handleCorruption/Assignment`, etc This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll
ableegoldman commented on pull request #8682: URL: https://github.com/apache/kafka/pull/8682#issuecomment-630522346 I also think we should reset/clear the set at the beginning of `tryToLockAllNonEmptyTaskDirectories` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie opened a new pull request #8692: KAFKA-10018:Change sh to bash
jiameixie opened a new pull request #8692: URL: https://github.com/apache/kafka/pull/8692 "#!/bin/sh" is used in kafka-server-stop.sh and zookeeper-server-stop.sh. [[ is a bash-builtin and used. Modern Debian and Ubuntu systems, which symlink sh to dash by default. So "[[: not found" will occur. Change "#!/bin/sh" into "#!/bin/bash" can avoid this error. Modify and make all scripts using bash. Change-Id: I733c6e31f76d768e71ac0e040a33da8f4bd8f005 Signed-off-by: Jiamei Xie *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent
guozhangwang commented on a change in pull request #8681: URL: https://github.com/apache/kafka/pull/8681#discussion_r426975369 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta } if (stores.containsKey(storeName)) { -throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); +log.warn("State Store {} has already been registered, which could be due to a half-way registration" + Review comment: +1, we can rely on `storeManager#getStore` inside `StateManagerUtil` to check if the store is already registered. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta } if (stores.containsKey(storeName)) { -throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); +log.warn("State Store {} has already been registered, which could be due to a half-way registration" + +"in the previous round", storeName); Review comment: nit: we could make the warn log entry more clear that we did not override the registered the store, e.g. "Skipped registering state store {} since it has already existed in the state manager, ..." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10018) Change sh to bash
jiamei xie created KAFKA-10018: -- Summary: Change sh to bash Key: KAFKA-10018 URL: https://issues.apache.org/jira/browse/KAFKA-10018 Project: Kafka Issue Type: Bug Components: admin Reporter: jiamei xie Assignee: jiamei xie "#!/bin/sh" is used in kafka-server-stop.sh and zookeeper-server-stop.sh. [[ is a bash-builtin and used. Modern Debian and Ubuntu systems, which symlink sh to dash by default. So "[[: not found" will occur. Change "#!/bin/sh" into "#!/bin/bash" can avoid this error. Modify and make all scripts using bash. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll
guozhangwang commented on pull request #8682: URL: https://github.com/apache/kafka/pull/8682#issuecomment-630515587 I agree with @ableegoldman here, after the `while (taskIdIterator.hasNext()` loop we can see if there are still remaining tasks, and then log an WARN similar to the end of `handleRevocation` before clearing them: ``` if (!remainingPartitions.isEmpty()) { log.warn("The following partitions {} are missing from the task partitions. It could potentially " + "due to race condition of consumer detecting the heartbeat failure, or the tasks " + "have been cleaned up by the handleAssignment callback.", remainingPartitions); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110746#comment-17110746 ] Vince Mu commented on KAFKA-6520: - [~guozhang] that makes sense. I just want to clarify whether we are adding a DISCONNECTED state purely to streamThread or whether we need to add it KafkaStreams client as well? If it's the latter, would it make sense to transit the client to the DISCONNECTED state when all it's streamThreads are in the DISCONNECTED state? > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
hachikuji commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r426944679 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -369,6 +369,31 @@ class GroupCoordinator(val brokerId: Int, } } + /** + * try to complete produce, fetch and delete requests if the HW of partition is incremented. Otherwise, we try to complete + * only delayed fetch requests. + * + * Noted that this method may hold a lot of group lock so the caller should NOT hold any group lock + * in order to avoid deadlock + * @param topicPartitions topic partition and leaderHWIncremented + */ + private[this] def completeDelayedRequests(topicPartitions: Map[TopicPartition, Boolean]): Unit = +topicPartitions.foreach { + case (tp, leaderHWIncremented) => +if (leaderHWIncremented) groupManager.replicaManager.completeDelayedRequests(tp) +else groupManager.replicaManager.completeDelayedFetchRequests(tp) +} + + /** + * complete the delayed join requests associated to input group keys. + * + * Noted that this method may hold a lot of group lock so the caller should NOT hold any group lock + * in order to avoid deadlock + * @param groupKeys group keys to complete + */ + private[this] def completeDelayedJoinRequests(groupKeys: Set[GroupKey]): Unit = +groupKeys.foreach(joinPurgatory.checkAndComplete) Review comment: Hmm.. Does the group purgatory suffer from the same deadlock potential? If we call `checkAndComplete` for a group "foo," I don't think we would attempt completion for any other group. ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -970,7 +970,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may + *produce deadlock if caller already holds a group lock. Hence, caller should pass + *false to disable completion and then complete the delayed requests after releasing + *held group lock + */ + def appendRecordsToLeader(records: MemoryRecords, +origin: AppendOrigin, +requiredAcks: Int, +completeDelayedRequests: Boolean): LogAppendResult = { Review comment: Currently we have a somewhat convoluted model where `ReplicaManager` creates delayed operations, but we depend on lower level components like `Partition` to be aware of them and complete them. This breaks encapsulation. Not something we should try to complete in this PR, but as an eventual goal, I think we can consider trying to factor delayed operations out of `Partition` so that they can be managed by `ReplicaManager` exclusively. If you assume that is the end state, then we could drop `completeDelayedRequests` and let `ReplicaManager` _always_ be responsible for checking delayed operations after appending to the log. Other than `ReplicaManager`, the only caller of this method is `GroupMetadataManager` which uses it during offset expiration. I think the only reason we do this is because we didn't want to waste purgatory space. I don't think that's a good enough reason to go outside the normal flow. It would be simpler to follow the same path. Potentially we could make the callback an `Option` so that we still have a way to avoid polluting the purgatory. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -769,20 +813,25 @@ class GroupCoordinator(val brokerId: Int, // on heartbeat response to eventually notify the rebalance in progress signal to the consumer val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) -groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback) +partitionsToComplete ++= groupManager.storeOffsets( + group = group, + consumerId = memberId, + offsetMetadata = offsetMetadata, + responseCallback = responseCallback, + completeDelayedRequests = false) case CompletingRebalance => // We should not receive a commit request if the group has not completed rebalance; // but since the consumer's member.id and generation is valid, it means it has received // the latest group generation information from the JoinResponse. // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.RE
[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent
ableegoldman commented on a change in pull request #8681: URL: https://github.com/apache/kafka/pull/8681#discussion_r426958947 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta } if (stores.containsKey(storeName)) { -throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); +log.warn("State Store {} has already been registered, which could be due to a half-way registration" + Review comment: I think we might want to skip the re-registration higher up the call stack. In `StateManagerUtil#registerStateStores` we call `store.init` on each store which ultimately results in this `registerStore` being called This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on pull request #8691: URL: https://github.com/apache/kafka/pull/8691#issuecomment-630496674 @xvrl @rhauch @ijuma Could you review this PR for KIP-606. Thanks, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao reassigned KAFKA-9989: -- Assignee: HaiyuanZhao > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > Labels: newbie > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment (non-empty) before > kicking off the record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner
ijuma commented on a change in pull request #8690: URL: https://github.com/apache/kafka/pull/8690#discussion_r426950830 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java ## @@ -65,12 +65,20 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by } private int nextValue(String topic) { -AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { -return new AtomicInteger(0); -}); +AtomicInteger counter = topicCounterMap. +computeIfAbsent(topic, k -> new AtomicInteger(0)); return counter.getAndIncrement(); } +@Override +public void onNewBatch(String topic, Cluster cluster, int prevPartition) { +// After onNewBatch is called, we will call partition() again. +// So 'rewind' the counter for this topic. +AtomicInteger counter = topicCounterMap. +computeIfAbsent(topic, k -> new AtomicInteger(0)); +counter.getAndDecrement(); Review comment: Is this an issue for third party partitioners as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu opened a new pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu opened a new pull request #8691: URL: https://github.com/apache/kafka/pull/8691 Implement KIP-606, add metadata context to MetricsReporter: Added a new api to MetricsReporter to allow client to expose additional metadata fields to reporter plugin. Added an interface MetricsContext to encapsulate metadata. Deprecated JmexReporter(String prefix) constructor. The prefix will be passed to the reporter via MetricsContext. Replaced existing usage of JmxReporter with the default ImxReporter and pass JMX prefix to MetricsContext using _namespace as key. From Kafka broker, populate MetricsContext with: kafka.cluster.id and kafka.nroker.id From Connect, populate MetricsContext with: connect.kafka.cluster.id, connect.group.id Committer Checklist (excluded from commit message) Verify design and implementation Verify test coverage and CI build status Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner
cmccabe opened a new pull request #8690: URL: https://github.com/apache/kafka/pull/8690 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9989: --- Description: System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and wait for the finalized assignment (non-empty) before kicking off the record processing validation. was: System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and wait for the finalized assignment before kicking off the record processing validation. > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment (non-empty) before > kicking off the record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9989: --- Description: System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and wait for the finalized assignment before kicking off the record processing validation. was: System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and skip the record processing validation when the assignment is empty. > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment before kicking off the > record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-9989: -- Assignee: (was: Boyang Chen) > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment before kicking off the > record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9989: --- Labels: newbie (was: ) > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment before kicking off the > record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10010) Should make state store registration idempotent
[ https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10010: Summary: Should make state store registration idempotent (was: Should close standby task for safety during HandleLostAll) > Should make state store registration idempotent > --- > > Key: KAFKA-10010 > URL: https://issues.apache.org/jira/browse/KAFKA-10010 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > The current lost all logic doesn't close standby task, which could > potentially lead to a tricky condition like below: > 1. The standby task was initializing as `CREATED` state, and task corrupted > exception was thrown from registerStateStores > 2. The task corrupted exception was caught, and do a non-affected task commit > 3. The task commit failed due to task migrated exception > 4. The handleLostAll didn't close the standby task, leaving it as CREATED > state > 5. Next rebalance complete, the same task was assigned back as standby task. > 6. Illegal Argument exception caught : > {code:java} > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 > 18:56:18,050] ERROR > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) > java.lang.IllegalArgumentException: stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already > been registered. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) > at > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173
cadonna commented on a change in pull request #8689: URL: https://github.com/apache/kafka/pull/8689#discussion_r426924669 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -170,7 +171,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamTh } @Test -public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithLessClientsThanTasks() { +public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks() { Review comment: This name seemed not correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173
cadonna commented on pull request #8689: URL: https://github.com/apache/kafka/pull/8689#issuecomment-630467234 Call for review: @vvcephei @ableegoldman This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173
cadonna opened a new pull request #8689: URL: https://github.com/apache/kafka/pull/8689 Unit tests - shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks() - shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients() - shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients() verify that bug KAFKA-9173 is fixed with the new HighAvailabilityTaskAssignor. shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks() ensures that tasks are evenly assigned over clients when all overprovisioned clients join simultaneously. shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients() ensures that warm-up tasks are assigned to two new clients that join the group although the assignment is already balanced over stream threads. shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients() ensures that stateful active tasks are balanced over previous and warmed-up client although it the previous assignment is balanced over stream threads. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
[ https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110643#comment-17110643 ] Bill Bejeck commented on KAFKA-7271: [~mjsax] I don't think I'll have time for this one, so I've unassigned myself. > Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers > --- > > Key: KAFKA-7271 > URL: https://issues.apache.org/jira/browse/KAFKA-7271 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: John Roesler >Priority: Blocker > Fix For: 2.6.0 > > > Fix in the oldest branch that ignores the test and cherry-pick forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
[ https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-7271: -- Assignee: (was: Bill Bejeck) > Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers > --- > > Key: KAFKA-7271 > URL: https://issues.apache.org/jira/browse/KAFKA-7271 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: John Roesler >Priority: Blocker > Fix For: 2.6.0 > > > Fix in the oldest branch that ignores the test and cherry-pick forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #8669: MINOR: consolidate processor context for active/standby
guozhangwang merged pull request #8669: URL: https://github.com/apache/kafka/pull/8669 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll
[ https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110638#comment-17110638 ] Boyang Chen commented on KAFKA-10010: - For more context, the [reason|[https://github.com/apache/kafka/pull/8440/files#r407722022]] we have to keep the txn commit before handle task corruption, since otherwise under EOS beta the stream thread could actually abort other healthy tasks. > Should close standby task for safety during HandleLostAll > - > > Key: KAFKA-10010 > URL: https://issues.apache.org/jira/browse/KAFKA-10010 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > The current lost all logic doesn't close standby task, which could > potentially lead to a tricky condition like below: > 1. The standby task was initializing as `CREATED` state, and task corrupted > exception was thrown from registerStateStores > 2. The task corrupted exception was caught, and do a non-affected task commit > 3. The task commit failed due to task migrated exception > 4. The handleLostAll didn't close the standby task, leaving it as CREATED > state > 5. Next rebalance complete, the same task was assigned back as standby task. > 6. Illegal Argument exception caught : > {code:java} > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 > 18:56:18,050] ERROR > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) > java.lang.IllegalArgumentException: stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already > been registered. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) > at > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #8669: MINOR: consolidate processor context for active/standby
ableegoldman commented on pull request #8669: URL: https://github.com/apache/kafka/pull/8669#issuecomment-630448469 Failed due to flaky `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta` and `ConnectorTopicsIntegrationTest.testGetActiveTopics` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110633#comment-17110633 ] Sophie Blee-Goldman edited comment on KAFKA-10017 at 5/18/20, 9:36 PM: --- With injectError = false: h3. Stacktrace java.lang.AssertionError: Did not receive all 10 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <10> but: <5> was less than <10> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) was (Author: ableegoldman): h3. Stacktrace java.lang.AssertionError: Did not receive all 10 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <10> but: <5> was less than <10> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, unit-test > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > With injectError = true: > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10017: Description: Creating a new ticket for this since the root cause is different than https://issues.apache.org/jira/browse/KAFKA-9966 With injectError = true: h3. Stacktrace java.lang.AssertionError: Did not receive all 20 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <20> but: <15> was less than <20> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) was: Creating a new ticket for this since the root cause is different than https://issues.apache.org/jira/browse/KAFKA-9966 h3. Stacktrace java.lang.AssertionError: Did not receive all 20 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <20> but: <15> was less than <20> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, unit-test > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > With injectError = true: > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005
[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll
[ https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110636#comment-17110636 ] Boyang Chen commented on KAFKA-10010: - Had offline discussion with the team, so far some action items: # Make the state store registration idempotent to unblock the trunk soak # Add a logic to avoid aborting the txn when the task is in initialization phase (Get a separate ticket) > Should close standby task for safety during HandleLostAll > - > > Key: KAFKA-10010 > URL: https://issues.apache.org/jira/browse/KAFKA-10010 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > The current lost all logic doesn't close standby task, which could > potentially lead to a tricky condition like below: > 1. The standby task was initializing as `CREATED` state, and task corrupted > exception was thrown from registerStateStores > 2. The task corrupted exception was caught, and do a non-affected task commit > 3. The task commit failed due to task migrated exception > 4. The handleLostAll didn't close the standby task, leaving it as CREATED > state > 5. Next rebalance complete, the same task was assigned back as standby task. > 6. Illegal Argument exception caught : > {code:java} > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 > 18:56:18,050] ERROR > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) > java.lang.IllegalArgumentException: stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already > been registered. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) > at > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10017: Summary: Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta (was: Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]) > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, unit-test > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110633#comment-17110633 ] Sophie Blee-Goldman commented on KAFKA-10017: - h3. Stacktrace java.lang.AssertionError: Did not receive all 10 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <10> but: <5> was less than <10> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) > Flaky Test > EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] > - > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, unit-test > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] navina edited a comment on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…
navina edited a comment on pull request #8684: URL: https://github.com/apache/kafka/pull/8684#issuecomment-630444877 @ijuma What issues with `intern()` are you referring to? I know that there can be a performance hit when there are a lot of intern strings. I believe string interning mechanism has been improved in the later versions of java such as jdk8 / 9. I would like to understand the concern better before removing the string interning showed here. Thanks for the quick feedback! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] navina commented on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…
navina commented on pull request #8684: URL: https://github.com/apache/kafka/pull/8684#issuecomment-630444877 @ijuma What issues with intern are you referring to? I know that there can be a performance hit when there are a lot of intern strings. I believe string interning mechanism has been improved in the later versions of java such as jdk8 / 9. I would like to understand the concern better before removing the string interning showed here. Thanks for the quick feedback! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9292) KIP-551: Expose disk read and write metrics
[ https://issues.apache.org/jira/browse/KAFKA-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-9292. - Fix Version/s: 2.6.0 Resolution: Fixed > KIP-551: Expose disk read and write metrics > --- > > Key: KAFKA-9292 > URL: https://issues.apache.org/jira/browse/KAFKA-9292 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 2.6.0 > > > It's often helpful to know how many bytes Kafka is reading and writing from > the disk. The reason is because when disk access is required, there may be > some impact on latency and bandwidth. We currently don't have a metric that > measures this directly. It would be useful to add one. > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-551%3A+Expose+disk+read+and+write+metrics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe merged pull request #8569: URL: https://github.com/apache/kafka/pull/8569 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
Sophie Blee-Goldman created KAFKA-10017: --- Summary: Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] Key: KAFKA-10017 URL: https://issues.apache.org/jira/browse/KAFKA-10017 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Creating a new ticket for this since the root cause is different than https://issues.apache.org/jira/browse/KAFKA-9966 h3. Stacktrace java.lang.AssertionError: Did not receive all 20 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <20> but: <15> was less than <20> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110609#comment-17110609 ] Jorge Esteban Quilcate Otoya commented on KAFKA-4327: - [~mjsax] [~guozhang] I'd like to help closing this one as things have changed since it got created: * zookeeper dependency has been removed and * zookeeper argument deprecated. This tool carries a dependency to an argument parser that I'm not sure we would like to pull into streams module. I'd like to propose and agree in the following changes before moving forward: * move StreamsResetter to `tools` module * translate jopt parser (scala) into argparser (java) * remove zookeeper parameter If we agree on this, I can draft a small KIP to get this done. > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Esteban Quilcate Otoya >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-9755: Implement read path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r426806321 ## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param is the type of version range. + */ +public class Features { +private final Map features; + +/** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ +private Features(Map features) { +this.features = features; +} + +/** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ +public static Features supportedFeatures(Map features) { +return new Features(features); +} + +/** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ +public static Features finalizedFeatures(Map features) { +return new Features(features); +} + +public static Features emptyFinalizedFeatures() { Review comment: Is this function only used in unit test? ## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param is the type of version range. + */ +public class Features { +private final Map features; + +/** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ +private Features(Map features) { +this.features = features; +} + +/** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ +public static Features supportedFeatures(Map features) { +return new Features(features); Review comment: Could be simplified as new Features<> ## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. Review comment: nit: we could use {@link VersionRangeType} to reference to the classes. ## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. +
[GitHub] [kafka] hachikuji commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state
hachikuji commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r426867775 ## File path: clients/src/main/resources/common/message/ListGroupsRequest.json ## @@ -20,8 +20,14 @@ // Version 1 and 2 are the same as version 0. // // Version 3 is the first flexible version. - "validVersions": "0-3", + // + // Version 4 adds the States flexible field (KIP-518). + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ +{ "name": "States", "type": "[]string", "versions": "4+", "tag": 0, "taggedVersions": "4+", Review comment: Sorry I missed this from the discussion, but why are we bumping the version if we are only adding tagged fields? Is it so that we can detect whether the capability is supported? If so, then I wonder why we don't make this a regular field. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java ## @@ -26,4 +31,34 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions { + +private Optional> states = Optional.empty(); + +/** + * Only groups in these states will be returned by listConsumerGroups() Review comment: Probably worth adding a comment about broker compatibility with this API. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java ## @@ -26,4 +31,34 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions { + +private Optional> states = Optional.empty(); + +/** + * Only groups in these states will be returned by listConsumerGroups() + * If not set, all groups are returned without their states + * throw IllegalArgumentException if states is empty + */ +public ListConsumerGroupsOptions inStates(Set states) { +if (states == null || states.isEmpty()) { +throw new IllegalArgumentException("states should not be null or empty"); +} +this.states = Optional.of(states); +return this; +} + +/** + * All groups with their states will be returned by listConsumerGroups() + */ +public ListConsumerGroupsOptions inAnyState() { +this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class)); Review comment: Hmm.. We have an `UNKNOWN` state in `ConsumerGroupState` in case the group coordinator adds a new state that the client isn't aware of. Currently we're going to pass this through the request, which is a bit odd. Furthermore, if the coordinator _does_ add new states, we will be unable to see them using this API. I think it might be better to use a `null` list of states in the request to indicate that any state is needed. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1397,29 +1398,32 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleListGroupsRequest(request: RequestChannel.Request): Unit = { -val (error, groups) = groupCoordinator.handleListGroups() +val listGroupsRequest = request.body[ListGroupsRequest] +val states = listGroupsRequest.data.states.asScala.toList + +def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = { + new ListGroupsResponse(new ListGroupsResponseData() +.setErrorCode(error.code) +.setGroups(groups.map { group => +val listedGroup = new ListGroupsResponseData.ListedGroup() + .setGroupId(group.groupId) + .setProtocolType(group.protocolType) +if (!states.isEmpty) Review comment: Why don't we always return the state? I don't think overhead is a huge concern for an api like this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10016) Support For Purge Topic
David Mollitor created KAFKA-10016: -- Summary: Support For Purge Topic Key: KAFKA-10016 URL: https://issues.apache.org/jira/browse/KAFKA-10016 Project: Kafka Issue Type: Improvement Reporter: David Mollitor Some discussions about how to purge a topic. Please add native support for this operation. Is there a "starting offset" for each topic? Such a vehicle would allow for this value to be easily set with the current offeset and the brokers will skip (and clean) everything before that. [https://stackoverflow.com/questions/16284399/purge-kafka-topic] {code:none} kafka-topics --topic mytopic --purge {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #8687: MINOR: updated MacOS compatibility statement for RocksDB
mjsax merged pull request #8687: URL: https://github.com/apache/kafka/pull/8687 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8685: KAFKA-10014 Always try to close all channels in Selector#close
ijuma commented on a change in pull request #8685: URL: https://github.com/apache/kafka/pull/8685#discussion_r426850232 ## File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java ## @@ -363,23 +363,14 @@ public void wakeup() { @Override public void close() { List connections = new ArrayList<>(channels.keySet()); -try { -for (String id : connections) -close(id); -} finally { -// If there is any exception thrown in close(id), we should still be able -// to close the remaining objects, especially the sensors because keeping -// the sensors may lead to failure to start up the ReplicaFetcherThread if -// the old sensors with the same names has not yet been cleaned up. -AtomicReference firstException = new AtomicReference<>(); -Utils.closeQuietly(nioSelector, "nioSelector", firstException); -Utils.closeQuietly(sensors, "sensors", firstException); -Utils.closeQuietly(channelBuilder, "channelBuilder", firstException); -Throwable exception = firstException.get(); -if (exception instanceof RuntimeException && !(exception instanceof SecurityException)) { -throw (RuntimeException) exception; -} - +AtomicReference firstException = new AtomicReference<>(); Review comment: Have we considered using `Utils.closeAll` instead of multiple `closeQuietly`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8685: KAFKA-10014 Always try to close all channels in Selector#close
ijuma commented on a change in pull request #8685: URL: https://github.com/apache/kafka/pull/8685#discussion_r426850232 ## File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java ## @@ -363,23 +363,14 @@ public void wakeup() { @Override public void close() { List connections = new ArrayList<>(channels.keySet()); -try { -for (String id : connections) -close(id); -} finally { -// If there is any exception thrown in close(id), we should still be able -// to close the remaining objects, especially the sensors because keeping -// the sensors may lead to failure to start up the ReplicaFetcherThread if -// the old sensors with the same names has not yet been cleaned up. -AtomicReference firstException = new AtomicReference<>(); -Utils.closeQuietly(nioSelector, "nioSelector", firstException); -Utils.closeQuietly(sensors, "sensors", firstException); -Utils.closeQuietly(channelBuilder, "channelBuilder", firstException); -Throwable exception = firstException.get(); -if (exception instanceof RuntimeException && !(exception instanceof SecurityException)) { -throw (RuntimeException) exception; -} - +AtomicReference firstException = new AtomicReference<>(); Review comment: Have we considered using `Utils.closeAll`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…
ijuma commented on pull request #8684: URL: https://github.com/apache/kafka/pull/8684#issuecomment-630391729 Thanks for the PR. Java's built in string interning mechanism is known to have issues. Not sure we want to do that. Maybe we can remove that part of the change from this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe
cmccabe commented on pull request #8675: URL: https://github.com/apache/kafka/pull/8675#issuecomment-630389772 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas
apovzner commented on pull request #8650: URL: https://github.com/apache/kafka/pull/8650#issuecomment-630379942 It looks like the build couldn't even run tests: ``` 15:15:09 ERROR: Error cloning remote repo 'origin' ... 15:15:18 stderr: fatal: Unable to look up github.com (port 9418) (Name or service not known) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling
ijuma closed pull request #8688: URL: https://github.com/apache/kafka/pull/8688 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling
ijuma commented on pull request #8688: URL: https://github.com/apache/kafka/pull/8688#issuecomment-630376046 Closing this for now as there may be a better way to achieve this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8669: MINOR: consolidate processor context for active/standby
guozhangwang commented on pull request #8669: URL: https://github.com/apache/kafka/pull/8669#issuecomment-630370157 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll
[ https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110522#comment-17110522 ] Sophie Blee-Goldman commented on KAFKA-10010: - When I first started looking into the store registration and initialization logic for that PR, I remember thinking there was a bug since we would attempt to re-register stores if we hit an exception halfway through registration. I snooped around and it seemed like there wasn't really a way to hit this bug, but I fixed it anyways. Seems like there actually was a way to hit this bug after all, so nice catch [~bchen225242] > Should close standby task for safety during HandleLostAll > - > > Key: KAFKA-10010 > URL: https://issues.apache.org/jira/browse/KAFKA-10010 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > The current lost all logic doesn't close standby task, which could > potentially lead to a tricky condition like below: > 1. The standby task was initializing as `CREATED` state, and task corrupted > exception was thrown from registerStateStores > 2. The task corrupted exception was caught, and do a non-affected task commit > 3. The task commit failed due to task migrated exception > 4. The handleLostAll didn't close the standby task, leaving it as CREATED > state > 5. Next rebalance complete, the same task was assigned back as standby task. > 6. Illegal Argument exception caught : > {code:java} > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 > 18:56:18,050] ERROR > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) > java.lang.IllegalArgumentException: stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already > been registered. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) > at > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll
[ https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110518#comment-17110518 ] Sophie Blee-Goldman commented on KAFKA-10010: - It's possible the active <-> standby task conversion PR would actually fix this on the side, as it skips re-registering any store that's already registered. I'd like to avoid closing standbys during handleLostAll since this will completely clear out any in-memory stores, for example > Should close standby task for safety during HandleLostAll > - > > Key: KAFKA-10010 > URL: https://issues.apache.org/jira/browse/KAFKA-10010 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > The current lost all logic doesn't close standby task, which could > potentially lead to a tricky condition like below: > 1. The standby task was initializing as `CREATED` state, and task corrupted > exception was thrown from registerStateStores > 2. The task corrupted exception was caught, and do a non-affected task commit > 3. The task commit failed due to task migrated exception > 4. The handleLostAll didn't close the standby task, leaving it as CREATED > state > 5. Next rebalance complete, the same task was assigned back as standby task. > 6. Illegal Argument exception caught : > {code:java} > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 > 18:56:18,050] ERROR > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) > java.lang.IllegalArgumentException: stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already > been registered. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) > at > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling
ijuma opened a new pull request #8688: URL: https://github.com/apache/kafka/pull/8688 This is a bit odd in that it's not needed from a semantics perspective, but it would make it much easier to distinguish the cost of follower fetches versus consumer fetches when profiling. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe commented on pull request #8569: URL: https://github.com/apache/kafka/pull/8569#issuecomment-630352959 @mumrah : Good question. I don't think anyone has looked at Sigar. I guess the question is whether we want to get into the business of doing general-purpose node monitoring. I think many people would say no. We're doing this metric mainly because it's very simple to check, and also very impactful for Kafka (starting heavy disk reads often correlates with performance tanking). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class
[ https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110508#comment-17110508 ] Sophie Blee-Goldman commented on KAFKA-6579: No, I looked into it but the scope its nontrivial. I'll unassign it and maybe someone from the community can pick it up ^^ > Consolidate window store and session store unit tests into a single class > - > > Key: KAFKA-6579 > URL: https://issues.apache.org/jira/browse/KAFKA-6579 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: newbie, unit-test > > For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared > among all its implementations; however for window and session stores, each > class has its own independent unit test classes that do not share the test > coverage. In fact, many of these test classes share the same unit test > functions (e.g. {{RocksDBWindowStoreTest}}, > {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}). > It is better to use the same pattern as for key value stores to consolidate > these test functions into a shared base class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class
[ https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-6579: -- Assignee: (was: Sophie Blee-Goldman) > Consolidate window store and session store unit tests into a single class > - > > Key: KAFKA-6579 > URL: https://issues.apache.org/jira/browse/KAFKA-6579 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, unit-test > > For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared > among all its implementations; however for window and session stores, each > class has its own independent unit test classes that do not share the test > coverage. In fact, many of these test classes share the same unit test > functions (e.g. {{RocksDBWindowStoreTest}}, > {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}). > It is better to use the same pattern as for key value stores to consolidate > these test functions into a shared base class. -- This message was sent by Atlassian Jira (v8.3.4#803005)