[GitHub] [kafka] fml2 commented on a change in pull request #9607: [KAFKA-10722] doc: Described the types of the stores used

2020-11-17 Thread GitBox


fml2 commented on a change in pull request #9607:
URL: https://github.com/apache/kafka/pull/9607#discussion_r525874521



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -870,6 +870,12 @@ Stateful 
transformations depend on state for processing inputs and producing outputs and 
require a state store associated with the stream processor. For 
example, in aggregating operations, a windowing state store is used to collect 
the latest aggregation results per
 window. In join operations, a windowing state store is 
used to collect all of the records received so far within the
 defined window boundary.
+   Note: Following store types are used regardless of 
the possibly specified type (via the parameter materialized):
+   
+   non-windowed aggregations and plain KTables use TimestampedKeyValueStores
+   time-windowed aggregations and kstream-kstream joins 
use TimestampedWindowStores

Review comment:
   OK, will do.
   
   Why did the build fail?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fml2 commented on a change in pull request #9607: [KAFKA-10722] doc: Described the types of the stores used

2020-11-17 Thread GitBox


fml2 commented on a change in pull request #9607:
URL: https://github.com/apache/kafka/pull/9607#discussion_r525874220



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -870,6 +870,12 @@ Stateful 
transformations depend on state for processing inputs and producing outputs and 
require a state store associated with the stream processor. For 
example, in aggregating operations, a windowing state store is used to collect 
the latest aggregation results per
 window. In join operations, a windowing state store is 
used to collect all of the records received so far within the
 defined window boundary.
+   Note: Following store types are used regardless of 
the possibly specified type (via the parameter materialized):
+   
+   non-windowed aggregations and plain KTables use TimestampedKeyValueStores

Review comment:
   OK, will do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fml2 commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate

2020-11-17 Thread GitBox


fml2 commented on a change in pull request #9606:
URL: https://github.com/apache/kafka/pull/9606#discussion_r525873831



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
##
@@ -438,7 +439,8 @@
  * query the value of the key on a parallel running instance of your Kafka 
Streams application.
  *
  * 
- * For failure and recovery the store will be backed by an internal 
changelog topic that will be created in Kafka.
+ * For failure and recovery the store (which always will be of type {@link 
TimestampedKeyValueStore} -- regardless of what
+ * is specified in the parameter {@materialized}) will be backed by an 
internal changelog topic that will be created in Kafka.

Review comment:
   Yes, actually, I wanted to write this but somehow... I will correct it. 
Shoudl I then squash the commits?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fml2 commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate

2020-11-17 Thread GitBox


fml2 commented on a change in pull request #9606:
URL: https://github.com/apache/kafka/pull/9606#discussion_r525873511



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
##
@@ -381,7 +381,8 @@
  * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
  *
  * 
- * For failure and recovery the store will be backed by an internal 
changelog topic that will be created in Kafka.
+ * For failure and recovery the store (which always will be of type {@link 
TimestampedKeyValueStore}) will be backed by
+ * an internal changelog topic that will be created in Kafka.

Review comment:
   Yes, this should be probably done. But since I don't know how they work 
and what to write there, I'd prefer to do it in another PR (or someone else 
should do it).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9599: MINOR: Include connector name in error message

2020-11-17 Thread GitBox


chia7712 commented on pull request #9599:
URL: https://github.com/apache/kafka/pull/9599#issuecomment-729500064


   @C0urante Thanks for your patch. Merge to trunk!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9599: MINOR: Include connector name in error message

2020-11-17 Thread GitBox


chia7712 merged pull request #9599:
URL: https://github.com/apache/kafka/pull/9599


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9610: MINOR: remove "gradle wrapper" from travis.yml

2020-11-17 Thread GitBox


chia7712 opened a new pull request #9610:
URL: https://github.com/apache/kafka/pull/9610


   ```
   > Failed to apply plugin [class 
'com.github.spotbugs.snom.SpotBugsBasePlugin']
  > Gradle version Gradle 5.1.1 is unsupported. Please use Gradle 5.6 or 
later.
   ```
   
   ```com.github.spotbugs.snom.SpotBugsBasePlugin``` requires gradle 5.6+ but 
the gradle supported by travis is 4.0 or 5.1.1 
(https://docs.travis-ci.com/user/reference/trusty/#gradle-version and 
https://docs.travis-ci.com/user/reference/xenial/)
   
   However, we don't need to call ```gradle wrapper``` since ```gradlew``` 
already exists in our project. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#issuecomment-729475252


   Merged to trunk 🥳 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman merged pull request #9487:
URL: https://github.com/apache/kafka/pull/9487


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#issuecomment-729474257


   System tests passed, all three Java builds passed. Merging now



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

2020-11-17 Thread GitBox


kowshik commented on pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#issuecomment-729473539


   Thanks for the review @ijuma ! I have addressed the comments in 
8716429b48cad8af6ad73109c1d9f7442823c02f .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

2020-11-17 Thread GitBox


kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r525850037



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
 try {
   for ((dir, dirJobs) <- jobs) {
-dirJobs.foreach(_.get)
+val hasErrors = dirJobs.exists {

Review comment:
   Thats a really good point. Done.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
 try {
   for ((dir, dirJobs) <- jobs) {
-dirJobs.foreach(_.get)
+val hasErrors = dirJobs.exists {
+  future =>

Review comment:
   Done.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
 try {
   for ((dir, dirJobs) <- jobs) {
-dirJobs.foreach(_.get)
+val hasErrors = dirJobs.exists {
+  future =>
+try {

Review comment:
   Good idea, done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10497) Convert group coordinator metadata schemas to use generated protocol

2020-11-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10497.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Convert group coordinator metadata schemas to use generated protocol
> 
>
> Key: KAFKA-10497
> URL: https://issues.apache.org/jira/browse/KAFKA-10497
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.8.0
>
>
> We need to convert the internal schemas used for representing group metadata 
> to the generated protocol. This opens the door for flexible version support 
> on the next bump. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-11-17 Thread GitBox


chia7712 merged pull request #9318:
URL: https://github.com/apache/kafka/pull/9318


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9876) Implement Raft Protocol for Metadata Quorum

2020-11-17 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234299#comment-17234299
 ] 

feyman commented on KAFKA-9876:
---

Hi, [~hachikuji], I'm interested in Raft and also KIP-595, I would like to get 
involved in this, is their any sub-task that I can pick up?

I'm currently reading the KIP document/discussion and also the implementation 
to get familiar. Checked with [~bchen225242] offline, he mentioned that some 
discussion are not finalized, so I just ask here~

Thanks!

> Implement Raft Protocol for Metadata Quorum
> ---
>
> Key: KAFKA-9876
> URL: https://issues.apache.org/jira/browse/KAFKA-9876
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> This tracks the completion of the Raft Protocol specified in KIP-595: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum.
>  If/when the KIP is approved by the community, we will create smaller 
> sub-tasks to track overall prgress.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-11-17 Thread GitBox


dongjinleekr commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r525845117



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##
@@ -186,19 +186,29 @@ public void shouldReportDirectoryEmpty() throws 
IOException {
 
 @Test
 public void shouldThrowProcessorStateException() throws IOException {

Review comment:
   Totally agree. :smile: 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

2020-11-17 Thread GitBox


ijuma commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r525835495



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
 try {
   for ((dir, dirJobs) <- jobs) {
-dirJobs.foreach(_.get)
+val hasErrors = dirJobs.exists {

Review comment:
   This looks wrong. `exists` short-circuits. I think you want `map` 
followed by `exists`.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
 try {
   for ((dir, dirJobs) <- jobs) {
-dirJobs.foreach(_.get)
+val hasErrors = dirJobs.exists {
+  future =>

Review comment:
   Nit: this should be in the previous line.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File],
 
 try {
   for ((dir, dirJobs) <- jobs) {
-dirJobs.foreach(_.get)
+val hasErrors = dirJobs.exists {
+  future =>
+try {

Review comment:
   You can use `scala.util.Try` to wrap the call and get a `Success` or 
`Failure`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-11-17 Thread GitBox


dongjinleekr commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r525835354



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##
@@ -140,9 +140,6 @@ public void 
shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE
 IntegrationTestUtils.produceSynchronously(producerConfig, false, 
input, Optional.empty(),
 singletonList(new KeyValueTimestamp<>("k1", "v1", 0L)));
 
-TestUtils.waitForCondition(stateDir::exists,
-"Failed awaiting CreateTopics first request failure");

Review comment:
   - Previous: The test asserts that the (empty) StateStore directory is 
not deleted.
   - Now: The empty StateStore directory is deleted in the cleanup process, so 
this assertion is no longer valid. (wait, would it much better to negate the 
logical condition instead of removing it?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10737:
---
Description: 
As MM2 is adopted by more users, new and advanced use cases requires Kafka 
clusters to be secure, and one way to make this happen is SSL-enabled cluster.

Currently there is no clear doc on how to configure MM2 so that it can consume 
from or produce to a SSL-enable cluster. Some users spent quite amount of time 
and found out the right config (see 
https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to 
clearly doc on this.

  was:
As MM2 is adopted by more users, new and advanced use cases requires Kafka 
clusters to be secure, and one way to make this happen is SSL-enabled cluster.

Currently there is no clear doc on how to configure MM2 so that it can consume 
from or produce to a SSL-enable cluster. Some users spent quite amount of time 
and found out the right config (see ). So it would be great to clearly doc on 
this.


> MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster 
> in README.md 
> --
>
> Key: KAFKA-10737
> URL: https://issues.apache.org/jira/browse/KAFKA-10737
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
>
> As MM2 is adopted by more users, new and advanced use cases requires Kafka 
> clusters to be secure, and one way to make this happen is SSL-enabled cluster.
> Currently there is no clear doc on how to configure MM2 so that it can 
> consume from or produce to a SSL-enable cluster. Some users spent quite 
> amount of time and found out the right config (see 
> https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to 
> clearly doc on this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md

2020-11-17 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10737:
--

 Summary: MirrorMaker 2: clarify how to produce to or consume from 
SSL-enabled cluster in README.md 
 Key: KAFKA-10737
 URL: https://issues.apache.org/jira/browse/KAFKA-10737
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Ning Zhang
Assignee: Ning Zhang


As MM2 is adopted by more users, new and advanced use cases requires Kafka 
clusters to be secure, and one way to make this happen is SSL-enabled cluster.

Currently there is no clear doc on how to configure MM2 so that it can consume 
from or produce to a SSL-enable cluster. Some users spent quite amount of time 
and found out the right config (see ). So it would be great to clearly doc on 
this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang resolved KAFKA-10704.

Resolution: Resolved

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: listNodes
> {code}



--
This message 

[jira] [Reopened] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reopened KAFKA-10704:


> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: listNodes
> {code}



--
This me

[jira] [Updated] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10704:
---
Fix Version/s: (was: 2.7.0)

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: listNodes
> {code}



--
This

[jira] [Commented] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234271#comment-17234271
 ] 

Ning Zhang commented on KAFKA-10704:


Indeed, we should clearly document how to produce to SSL-enabled cluster. I 
will create a PR against 
[https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md] to add a 
section about SSL-enabled cluster

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callNam

[jira] [Assigned] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-10704:
--

Assignee: Ning Zhang

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: li

[jira] [Commented] (KAFKA-10728) Mirroring data without decompressing with MirrorMaker 2.0

2020-11-17 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234255#comment-17234255
 ] 

Ning Zhang commented on KAFKA-10728:


my first impression is: unless the producer of MM2 is explicitly set to use 
`uncompressed` with [https://kafka.apache.org/documentation/#compression.type] 
it will use the default compression value

> Mirroring data without decompressing with MirrorMaker 2.0
> -
>
> Key: KAFKA-10728
> URL: https://issues.apache.org/jira/browse/KAFKA-10728
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Eazhilan Nagarajan
>Priority: Major
>
> Hello, 
>  
> I use MirrorMaker 2.0 to copy data across two Kafka clusters and it's all 
> working fine. Recently we enabled compressing while producing data into any 
> topic which had a very positive impact on the storage and other resources but 
> while mirroring, the data seems to be decompressed at the target Kafka 
> cluster. I tried enabling compression using the below config in MM2, the data 
> at the target cluster is compressed now, the decompress and re-compress 
> continues to happen and it eats up a lot of resources unnecessarily.
>  
> {noformat}
> - alias: my-passive-cluster
> authentication:
>   passwordSecret:
> password: password
> secretName: passive-cluster-secret
>   type: scram-sha-512
>   username: user-1
> bootstrapServers: my-passive-cluster.com:443
> config:
>   config.storage.replication.factor: 3
>   offset.storage.replication.factor: 3
>   status.storage.replication.factor: 3
>   producer.compression.type: gzip{noformat}
>  I found couple of Jira issues talking about it but I don't know if the 
> shallow iterator option is available now.
> https://issues.apache.org/jira/browse/KAFKA-732, 
> https://issues.apache.org/jira/browse/KAFKA-845
>  
> Kindly let me if this is currently available or if it'll be available in the 
> future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics

2020-11-17 Thread GitBox


ableegoldman opened a new pull request #9609:
URL: https://github.com/apache/kafka/pull/9609


   Followup to https://github.com/apache/kafka/pull/9582
   
   Will leave the ability to create multiple KTables from the same source topic 
as followup work. Similarly, creating a KStream and a KTable from the same 
topic can be tackled later if need be
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9604: [MINOR] adjust the log level to error

2020-11-17 Thread GitBox


chia7712 commented on a change in pull request #9604:
URL: https://github.com/apache/kafka/pull/9604#discussion_r525799307



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -477,7 +477,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   val exceptionsSummary = mergedResponseStatus.map { case 
(topicPartition, status) =>
 topicPartition -> status.error.exceptionName
   }.mkString(", ")
-  info(
+  error(

Review comment:
   Is this message duplicate to 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L613
 ? If so, is it better to remove one of them?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-11-17 Thread GitBox


chia7712 commented on pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#issuecomment-729399596


   @Lincong Could you take a look at those failed tests? I will give a review 
later :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#issuecomment-729396557


   System test run (still running but so far it's all PASS) -- 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4292/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-11-17 Thread GitBox


abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525764228



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##
@@ -178,6 +183,11 @@ class BrokerToControllerRequestThread(networkClient: 
KafkaClient,
 }
   }
 
+  private def isTimedOut(response: ClientResponse): Boolean = {
+val requestCreatedTime = response.receivedTimeMs() - 
response.requestLatencyMs()

Review comment:
   Is it legitimate to compare with requestTimeout here since we actually 
measure the request buffered time on the broker-to-controller channel queue? 
Should we introduce a new timeout config here somehow?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-11-17 Thread GitBox


abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525763121



##
File path: 
clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
##
@@ -24,4 +24,11 @@
 
 void onComplete(ClientResponse response);
 
+/**
+ * Fire when the request transmission hits a fatal exception.
+ *
+ * @param exception the thrown exception
+ */
+default void onFailure(RuntimeException exception) {

Review comment:
   I see, if this is the case, we need a customized completion handler for 
both forwarding and AlterISR IMHO.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

2020-11-17 Thread GitBox


abbccdda merged pull request #9569:
URL: https://github.com/apache/kafka/pull/9569


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525734417



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}
 
-if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-try {
-globalStreamThread.join();
-} catch (final InterruptedException e) {
-Thread.currentThread().interrupt();
-}
-globalStreamThread = null;
+if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+try {
+globalStreamThread.join();
+} catch (final InterruptedException e) {
+Thread.currentThread().interrupt();
 }
+globalStreamThread = null;
+}
 
-adminClient.close();
+adminClient.close();
 
-streamsMetrics.removeAllClientLevelMetrics();
-metrics.close();
+streamsMetrics.removeAllClientLevelMetrics();
+metrics.close();
+if (!error) {
 setState(State.NOT_RUNNING);
-}, "kafka-streams-close-thread");
+}
+}, "kafka-streams-close-thread");
+}
+
+private boolean close(final long timeoutMs) {
+if (!setState(State.PENDING_SHUTDOWN)) {

Review comment:
   WDYT about having both NOT_RUNNING and ERROR go through 
PENDING_SHUTDOWN, rather than just transitioning directly and permanently to 
ERROR? At a high level I think it just makes sense for ERROR and NOT_RUNNING to 
be symmetric. Also any benefit to having an intermediate PENDING_SHUTDOWN for 
the NOT_RUNNING case presumably applies to the ERROR case as well. eg, it 
indicates whether Streams has completed its shutdown or not: users know that an 
app in PENDING_SHUTDOWN should never be killed, its only safe to do so once it 
reaches NOT_RUNNING. We should provide the same fun

[GitHub] [kafka] showuon commented on pull request #9507: KAFKA-10628: remove all the unnecessary parameters from the tests which are using TopologyTestDriver

2020-11-17 Thread GitBox


showuon commented on pull request #9507:
URL: https://github.com/apache/kafka/pull/9507#issuecomment-729351004


   @vvcephei @chia7712  , please help review. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter

2020-11-17 Thread GitBox


showuon commented on pull request #9104:
URL: https://github.com/apache/kafka/pull/9104#issuecomment-729350784


   @kkonstantine , please help review. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jacky1193610322 commented on pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-11-17 Thread GitBox


jacky1193610322 commented on pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#issuecomment-729349575


   Thanks for your reply, I also have read KIP-500 and other KIP-631, It's good 
about the fence. but it will be released in a few months, before that, I think 
we also need to try the best to fence the broker when the controller already 
think the broker has died. In other words, we should fence 2-way. 
   `
   self-fence after getting an invalid version error from AlterIsr
   `
   yes, I think we need self-fence when the session is lost, we can't rely on 
receiving the other machine’s response because we can't receive the response 
when the Broker2ControllerChannel is broken.
   please let me know if you create a jira. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525701691



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread that threw the exception, 
Thread.currentThread().
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable) {
+if (oldHandler) {
+if (throwable instanceof RuntimeException) {
+throw (RuntimeException) throwable;
+} else if (throwable instanceof Error) {
+throw (Error) throwable;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
+}
+} else {
+handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT);
+}
+}
+
+private void handleStreamsUncaughtException(final Throwable throwable,
+final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
+if (oldHandler) {
+log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
+"The old handler will be ignored as long as a new handler 
is set.");
+}
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to " + 
action + "." +
+" The streams client is going to shut down now. ", 
throwable);
+close(Duration.ZERO);
+break;
+case SHUTDOWN_APPLICATION:
+if (throwable instanceof Error) {
+log.error("This option requires running threads to shut 
down the application." +
+"but the uncaught exception was an Error, which 
means this runtime is no " +
+"longer in a well-defined state. Attempting to 
send the shutdown command anyway.", throwable);
+}
+if (Thread.currentThread().equals(globalStreamThread) && 
threads.stream().noneMatch(StreamThread::isRunning)) {
+log.error("Exception in global thread caused the 
application to attempt to shutdown." +
+" This action will succeed only if there is at 
least one StreamThread running on this client." +
+" Currently there are no running threads so will 
now close the client.");
+close(Duration.ZERO);

Review comment:
   That's fair. I guess I was thinking less about the inherent meaning of 
ERROR vs NOT_RUNNING, and mor

[jira] [Updated] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol

2020-11-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10736:
---
Description: 
We need to convert the internal schemas used for representing transaction 
metadata to the generated protocol. This opens the door for flexible version 
support on the next bump. 

similar to https://issues.apache.org/jira/browse/KAFKA-10497

  was:
We need to convert the internal schemas used for representing transaction 
metadata to the generated protocol. This opens the door for flexible version 
support on the next bump. 

similar to KAFKA-10947


> Convert transaction coordinator metadata schemas to use generated protocol
> --
>
> Key: KAFKA-10736
> URL: https://issues.apache.org/jira/browse/KAFKA-10736
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We need to convert the internal schemas used for representing transaction 
> metadata to the generated protocol. This opens the door for flexible version 
> support on the next bump. 
> similar to https://issues.apache.org/jira/browse/KAFKA-10497



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol

2020-11-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10736:
---
Description: 
We need to convert the internal schemas used for representing transaction 
metadata to the generated protocol. This opens the door for flexible version 
support on the next bump. 

similar to KAFKA-10947

> Convert transaction coordinator metadata schemas to use generated protocol
> --
>
> Key: KAFKA-10736
> URL: https://issues.apache.org/jira/browse/KAFKA-10736
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We need to convert the internal schemas used for representing transaction 
> metadata to the generated protocol. This opens the door for flexible version 
> support on the next bump. 
> similar to KAFKA-10947



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10497) Convert group/transaction coordinator metadata schemas to use generated protocol

2020-11-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10497:
---
Description: We need to convert the internal schemas used for representing 
group metadata to the generated protocol. This opens the door for flexible 
version support on the next bump.   (was: We need to convert the internal 
schemas used for representing transaction/group metadata to the generated 
protocol. This opens the door for flexible version support on the next bump. )

> Convert group/transaction coordinator metadata schemas to use generated 
> protocol
> 
>
> Key: KAFKA-10497
> URL: https://issues.apache.org/jira/browse/KAFKA-10497
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We need to convert the internal schemas used for representing group metadata 
> to the generated protocol. This opens the door for flexible version support 
> on the next bump. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10497) Convert group coordinator metadata schemas to use generated protocol

2020-11-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10497:
---
Summary: Convert group coordinator metadata schemas to use generated 
protocol  (was: Convert group/transaction coordinator metadata schemas to use 
generated protocol)

> Convert group coordinator metadata schemas to use generated protocol
> 
>
> Key: KAFKA-10497
> URL: https://issues.apache.org/jira/browse/KAFKA-10497
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We need to convert the internal schemas used for representing group metadata 
> to the generated protocol. This opens the door for flexible version support 
> on the next bump. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol

2020-11-17 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10736:
--

 Summary: Convert transaction coordinator metadata schemas to use 
generated protocol
 Key: KAFKA-10736
 URL: https://issues.apache.org/jira/browse/KAFKA-10736
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525692960



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}

Review comment:
   Eh, I wouldn't bother with an AK ticket if this will be tackled in the 
next PR. I'll just make a list of all the minor followup work somewhere to keep 
track





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-17 Thread GitBox


chia7712 commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r525690436



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##
@@ -100,16 +113,92 @@ public void produceResponseRecordErrorsTest() {
 ProduceResponse response = new ProduceResponse(responseData);
 Struct struct = response.toStruct(ver);
 assertEquals("Should use schema version " + ver, 
ApiKeys.PRODUCE.responseSchema(ver), struct.schema());
-ProduceResponse.PartitionResponse deserialized = new 
ProduceResponse(struct).responses().get(tp);
+ProduceResponse.PartitionResponse deserialized = new 
ProduceResponse(new ProduceResponseData(struct, ver)).responses().get(tp);
 if (ver >= 8) {
 assertEquals(1, deserialized.recordErrors.size());
 assertEquals(3, deserialized.recordErrors.get(0).batchIndex);
 assertEquals("Record error", 
deserialized.recordErrors.get(0).message);
 assertEquals("Produce failed", deserialized.errorMessage);
 } else {
 assertEquals(0, deserialized.recordErrors.size());
-assertEquals(null, deserialized.errorMessage);
+assertNull(deserialized.errorMessage);
 }
 }
 }
+
+/**
+ * the schema in this test is from previous code and the automatic 
protocol should be compatible to previous schema.
+ */
+@Test
+public void testCompatibility() {

Review comment:
   That makes sense to me. Will remove redundant test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525686843



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -311,6 +314,8 @@ public void run() {
 "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
 recoverableException

Review comment:
   like in stream thread we can just add a call to the handler





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-17 Thread GitBox


chia7712 commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r525683594



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse 
response, Maphttps://issues.apache.org/jira/browse/KAFKA-10696

Review comment:
   copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525681642



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}
 
-if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-try {
-globalStreamThread.join();
-} catch (final InterruptedException e) {
-Thread.currentThread().interrupt();
-}
-globalStreamThread = null;
+if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+try {
+globalStreamThread.join();
+} catch (final InterruptedException e) {
+Thread.currentThread().interrupt();
 }
+globalStreamThread = null;
+}
 
-adminClient.close();
+adminClient.close();
 
-streamsMetrics.removeAllClientLevelMetrics();
-metrics.close();
+streamsMetrics.removeAllClientLevelMetrics();
+metrics.close();
+if (!error) {
 setState(State.NOT_RUNNING);
-}, "kafka-streams-close-thread");
+}
+}, "kafka-streams-close-thread");
+}
+
+private boolean close(final long timeoutMs) {
+if (!setState(State.PENDING_SHUTDOWN)) {

Review comment:
   This is currently the plan to remove that transition. It is pretty much 
the only change we plan to make to the FSM.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525680874



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread that threw the exception, 
Thread.currentThread().
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable) {
+if (oldHandler) {
+if (throwable instanceof RuntimeException) {
+throw (RuntimeException) throwable;
+} else if (throwable instanceof Error) {
+throw (Error) throwable;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
+}
+} else {
+handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT);
+}
+}
+
+private void handleStreamsUncaughtException(final Throwable throwable,
+final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
+if (oldHandler) {
+log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
+"The old handler will be ignored as long as a new handler 
is set.");
+}
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to " + 
action + "." +
+" The streams client is going to shut down now. ", 
throwable);
+close(Duration.ZERO);
+break;
+case SHUTDOWN_APPLICATION:
+if (throwable instanceof Error) {
+log.error("This option requires running threads to shut 
down the application." +
+"but the uncaught exception was an Error, which 
means this runtime is no " +
+"longer in a well-defined state. Attempting to 
send the shutdown command anyway.", throwable);
+}
+if (Thread.currentThread().equals(globalStreamThread) && 
threads.stream().noneMatch(StreamThread::isRunning)) {
+log.error("Exception in global thread caused the 
application to attempt to shutdown." +
+" This action will succeed only if there is at 
least one StreamThread running on this client." +
+" Currently there are no running threads so will 
now close the client.");
+close(Duration.ZERO);

Review comment:
   I am on the fence about this. I do think its would be consistent to be 
not running but also it did shut

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525678234



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}

Review comment:
   You are right I think. I just copied from the normal close method 
because I knew it worked. In a follow up we can maybe change both of these. Do 
you think that there should be a ak ticket to track it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10735) Kafka producer producing corrupted avro values when confluent cluster is recreated and producer application is not restarted

2020-11-17 Thread Tim Tattersall (Jira)
Tim Tattersall created KAFKA-10735:
--

 Summary: Kafka producer producing corrupted avro values when 
confluent cluster is recreated and producer application is not restarted
 Key: KAFKA-10735
 URL: https://issues.apache.org/jira/browse/KAFKA-10735
 Project: Kafka
  Issue Type: Bug
Reporter: Tim Tattersall


*Our Environment (AWS):*

1 x EC2 instance running 4 docker containers (using docker-compose)
 * cp-kafka 5.5.1
 * cp-zookeeper 5.5.1
 * cp-schema-registry 5.5.1
 * cp-enterprise-control-center 5.5.1

1 x ECS service running a single java application with spring-kafka producer

Topics are using String key and Avro value

 

*Problem:*
 * Avro values published after confluent cluster is recreated are corrupted. 
Expecting Avro json structure, received string value with corrupted Avro details
 ** Expected: {"metadata":{"nabEventVersion":"1.0","type":"Kafka IBMMQ sink 
connector","schemaUrl": ...*ongoing*
 ** Actual: 1.08Kafka IBMMQ source 
connector^kafka-conector-ibm-mq-source-entitlements-check\Kafka IBMMQ source 
connector - sourced*ongoing*

 

*How to Reproduce*
 # Using an existing confluent cluster
 # Start a kafka producer java application (ours running with spring-kafka)
 # Destroy the existing confluent cluster (using docker-compose down)
 # Recreate the confluent cluster (using docker-compose up)
 # Add the topic back onto the new cluster
 # Trigger a message to be produced by the running Kafka producer

 

*Current Workaround*
 * Killing running tasks on ECS service and allowing AWS to start new ones



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525663640



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -559,18 +552,51 @@ void runLoop() {
 }
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
- "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+"Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
 try {
 
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
 } catch (final TaskMigratedException taskMigrated) {
 handleTaskMigrated(taskMigrated);
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final UnsupportedVersionException e) {
+final String errorMessage = e.getMessage();
+if (errorMessage != null &&
+errorMessage.startsWith("Broker unexpectedly doesn't 
support requireStable flag on version ")) {
+
+log.error("Shutting down because the Kafka cluster seems 
to be on a too old version. " +

Review comment:
   We should remember to update the wording here when we add the 
REPLACE_THREAD functionality





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525658639



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -311,6 +314,8 @@ public void run() {
 "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
 recoverableException

Review comment:
   Hm ok this might be a problem. Since this is thrown from another catch 
block and not from the try block, it won't be caught by the catch block below 
and will slip through the exception handler. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525650632



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}
 
-if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-try {
-globalStreamThread.join();
-} catch (final InterruptedException e) {
-Thread.currentThread().interrupt();
-}
-globalStreamThread = null;
+if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+try {
+globalStreamThread.join();
+} catch (final InterruptedException e) {
+Thread.currentThread().interrupt();
 }
+globalStreamThread = null;
+}
 
-adminClient.close();
+adminClient.close();
 
-streamsMetrics.removeAllClientLevelMetrics();
-metrics.close();
+streamsMetrics.removeAllClientLevelMetrics();
+metrics.close();
+if (!error) {
 setState(State.NOT_RUNNING);
-}, "kafka-streams-close-thread");
+}
+}, "kafka-streams-close-thread");
+}
+
+private boolean close(final long timeoutMs) {
+if (!setState(State.PENDING_SHUTDOWN)) {

Review comment:
   I just realized that this is going to be a problem with the way the 
ERROR state is being used. IF we `closeToError` then we transition to ERROR and 
shut down, however `ERROR -> PENDING_SHUTDOWN` is still an allowed transition 
so there's nothing to prevent the shutdown from being triggered again when a 
user calls `close()`. And note that a lot of users most likely have a state 
listener at the moment which does exactly that, ie when it sees a transition to 
ERROR it immediately invokes close (because that's what you should do with the 
current semantics)
   Just another th

[GitHub] [kafka] hachikuji commented on a change in pull request #9608: MINOR: Enable testLogCleanerStats

2020-11-17 Thread GitBox


hachikuji commented on a change in pull request #9608:
URL: https://github.com/apache/kafka/pull/9608#discussion_r525646356



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##
@@ -815,9 +815,10 @@ class LogCleanerTest {
(0 until leo.toInt by 2).forall(!keys.contains(_)))
   }
 
+  @Test
   def testLogCleanerStats(): Unit = {
-// because loadFactor is 0.75, this means we can fit 2 messages in the map
-val cleaner = makeCleaner(2)
+// because loadFactor is 0.75, this means we can fit 3 messages in the map

Review comment:
   Sorry, but we have several identical comments in other test cases. Are 
those comments also wrong?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525640088



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}

Review comment:
   Why do we shut down the global thread only after all stream threads have 
completed their shutdown? Seems like it would be more efficient to send the 
shutdown signal to everyone first, and then wait for all the threads to join. 
Can you try this out in the followup PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525636554



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread that threw the exception, 
Thread.currentThread().
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable) {
+if (oldHandler) {
+if (throwable instanceof RuntimeException) {
+throw (RuntimeException) throwable;
+} else if (throwable instanceof Error) {
+throw (Error) throwable;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
+}
+} else {
+handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT);
+}
+}
+
+private void handleStreamsUncaughtException(final Throwable throwable,
+final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
+if (oldHandler) {
+log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
+"The old handler will be ignored as long as a new handler 
is set.");
+}
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to " + 
action + "." +
+" The streams client is going to shut down now. ", 
throwable);
+close(Duration.ZERO);
+break;
+case SHUTDOWN_APPLICATION:
+if (throwable instanceof Error) {
+log.error("This option requires running threads to shut 
down the application." +
+"but the uncaught exception was an Error, which 
means this runtime is no " +
+"longer in a well-defined state. Attempting to 
send the shutdown command anyway.", throwable);
+}
+if (Thread.currentThread().equals(globalStreamThread) && 
threads.stream().noneMatch(StreamThread::isRunning)) {
+log.error("Exception in global thread caused the 
application to attempt to shutdown." +
+" This action will succeed only if there is at 
least one StreamThread running on this client." +
+" Currently there are no running threads so will 
now close the client.");
+close(Duration.ZERO);

Review comment:
   I think it makes more sense to transition to ERROR in this case than to 
NOT_RUNNING. But let's put t

[GitHub] [kafka] hachikuji commented on pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-11-17 Thread GitBox


hachikuji commented on pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#issuecomment-729316632


   @jacky1193610322 I missed this comment before. It's a good question. In 
general, the leader will continue in its current state as long as possible. As 
you say, as soon as it needs to shrink/expand the ISR, it grabs the 
leaderAndIsr update and attempts to synchronously update the state. If 
Zookeeper can't be reached, then the thread gets stuck. Eventually this causes 
the broker to effectively deadlock, which has the side effect of preventing any 
Produce requests (and any other requests) from getting through.
   
   I think it's a fair point that this affords some protection for acks=1 
requests, but I think we tend to view the side effect of deadlocking the broker 
as worse than any benefit. In KIP-500, we have an alternative approach for 
self-fencing. The analogous case is when the leader cannot reach the 
controller. We use a heartbeating mechanism to maintain liveness in the 
cluster. Unlike with Zookeeper, we do not rely on the session expiration event 
in order to tell that a broker has been declared dead. Instead if we do not get 
a heartbeat response from the controller before some timeout, then we will stop 
accepting Produce requests. 
   
   I have been thinking a little bit about your suggestion to self-fence after 
getting an invalid version error from AlterIsr. It might help in the interim 
before KIP-500 is complete. I think our expectation here was that if we get an 
invalid version error, then the LeaderAndIsr with the updated state should soon 
be on the way. I suppose we could come up with reasons why that assumption 
might fail, so it might make sense to be a little more defensive. I will file a 
jira about this and we can see what others think. Thanks for the suggestion!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 opened a new pull request #9608: MINOR: Enable testLogCleanerStats

2020-11-17 Thread GitBox


mattwong949 opened a new pull request #9608:
URL: https://github.com/apache/kafka/pull/9608


   The `testLogCleanerStats` test in `LogCleanerTest.scala` was not enabled but 
it was implemented. This PR adds the @Test annotation, and also gives it a 
larger map to allow the test to pass as intended.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops

2020-11-17 Thread GitBox


ableegoldman commented on pull request #9568:
URL: https://github.com/apache/kafka/pull/9568#issuecomment-729307123


   Merged to trunk. Will cherrypick back to 2.7 once the ongoing release 
completes



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops

2020-11-17 Thread GitBox


ableegoldman merged pull request #9568:
URL: https://github.com/apache/kafka/pull/9568


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops

2020-11-17 Thread GitBox


ableegoldman commented on pull request #9568:
URL: https://github.com/apache/kafka/pull/9568#issuecomment-729306388


   All tests pass, but the build overall is broken due to failure of the new 
`Travis CI - Pull Request` thing. I can't find anything in the results that 
indicate an actual problem, so I'm just going to merge this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10734) Speedup the processing of LeaderAndIsr request

2020-11-17 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-10734:
--

 Summary: Speedup the processing of LeaderAndIsr request
 Key: KAFKA-10734
 URL: https://issues.apache.org/jira/browse/KAFKA-10734
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


Consider the case where a LeaderAndIsr request contains many partitions, of 
which the broker is asked to become the follower. Let's call these partitions 
*partitionsToMakeFollower*. Further more, let's assume the cluster has n 
brokers and each broker is configured to have m replica fetchers (via the 
num.replica.fetchers config). 
The broker is likely to have (n-1) * m fetcher threads.
Processing the LeaderAndIsr request requires
1. removing the "partitionsToMakeFollower" from all of the fetcher threads 
sequentially so that they won't be fetching from obsolete leaders.

2. adding the "partitionsToMakeFollower" to all of the fetcher threads 
sequentially

3. shutting down the idle fetcher threads sequentially (by checking the number 
of partitions held by each fetcher thread)

On top of that, for each of the 3 operations above, the operation is handled by 
the request handler thread (i.e. io thread). And to complete the operation, the 
request handler thread needs to contend for the "partitionMapLock" with the 
corresponding fetcher thread. In the worst case, the request handler thread is 
blocked for (n-1) * m times for removing the partitions, another (n-1) * m 
times for adding the partitions, and yet another (n-1) * m times for shutting 
down the idle fetcher threads.

Overall, all of the blocking can result in a significant delay in processing 
the LeaderAndIsr request. The further implication is that if the follower 
delays its fetching from the leader, there could be under MinISR partitions in 
the cluster, causing unavailability for clients.

This ticket is created to track speedup in the processing of the LeaderAndIsr 
request.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage

2020-11-17 Thread GitBox


hachikuji commented on a change in pull request #9539:
URL: https://github.com/apache/kafka/pull/9539#discussion_r525603946



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -394,7 +394,7 @@ public void transitionToLeader(long epochStartOffset) 
throws IOException {
 localId,
 epoch(),
 epochStartOffset,
-voters
+voters, candidateState.grantingVoters()

Review comment:
   nit: can we move to next line to follow the convention here?

##
File path: clients/src/main/resources/common/message/LeaderChangeMessage.json
##
@@ -22,7 +22,9 @@
 {"name": "LeaderId", "type": "int32", "versions": "0+",
   "about": "The ID of the newly elected leader"},
 {"name": "Voters", "type": "[]Voter", "versions": "0+",
-  "about": "The voters who voted for the current leader"}
+  "about": "The voters who voted at the time of election"},
+{"name": "EndorsingVoters", "type": "[]Voter", "versions": "0+",

Review comment:
   Could we use "Granting" instead of "Endorsing"? I think this is a more 
consistent term considering `LeaderState`.

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -709,6 +709,9 @@ static void verifyLeaderChangeMessage(
 assertEquals(leaderId, leaderChangeMessage.leaderId());
 assertEquals(voters.stream().map(voterId -> new 
Voter().setVoterId(voterId)).collect(Collectors.toList()),
 leaderChangeMessage.voters());
+assertEquals(voters.stream().map(voterId -> new 
Voter().setVoterId(voterId)).collect(Collectors.toSet()),

Review comment:
   Hmm.. It's curious that we can always rely on the full voter set. Do we 
not have any test cases where we have not received votes from all members?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-11-17 Thread GitBox


hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525598620



##
File path: 
clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
##
@@ -24,4 +24,11 @@
 
 void onComplete(ClientResponse response);
 
+/**
+ * Fire when the request transmission hits a fatal exception.
+ *
+ * @param exception the thrown exception
+ */
+default void onFailure(RuntimeException exception) {

Review comment:
   Here is the interface for `KafkaClient`:
   ```
ClientRequest newClientRequest(String nodeId,
  AbstractRequest.Builder requestBuilder,
  long createdTimeMs,
  boolean expectResponse,
  int requestTimeoutMs,
  RequestCompletionHandler callback);
   ```
   It is misleading to add an `onFailure` callback to 
`RequestCompletionHandler` if it is not going to be used by `KafkaClient` 
implementations such as `NetworkClient`. The usage in `ConsumerNetworkClient` 
is different because it is internal. In general, we should avoid leaking 
implementation details up to the interfaces.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-17 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r525586697



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse 
response, Maphttps://issues.apache.org/jira/browse/KAFKA-10696

Review comment:
   nit: since we have the jira for tracking, can we remove the TODO? A few 
more of these in the PR.

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##
@@ -100,16 +113,92 @@ public void produceResponseRecordErrorsTest() {
 ProduceResponse response = new ProduceResponse(responseData);
 Struct struct = response.toStruct(ver);
 assertEquals("Should use schema version " + ver, 
ApiKeys.PRODUCE.responseSchema(ver), struct.schema());
-ProduceResponse.PartitionResponse deserialized = new 
ProduceResponse(struct).responses().get(tp);
+ProduceResponse.PartitionResponse deserialized = new 
ProduceResponse(new ProduceResponseData(struct, ver)).responses().get(tp);
 if (ver >= 8) {
 assertEquals(1, deserialized.recordErrors.size());
 assertEquals(3, deserialized.recordErrors.get(0).batchIndex);
 assertEquals("Record error", 
deserialized.recordErrors.get(0).message);
 assertEquals("Produce failed", deserialized.errorMessage);
 } else {
 assertEquals(0, deserialized.recordErrors.size());
-assertEquals(null, deserialized.errorMessage);
+assertNull(deserialized.errorMessage);
 }
 }
 }
+
+/**
+ * the schema in this test is from previous code and the automatic 
protocol should be compatible to previous schema.
+ */
+@Test
+public void testCompatibility() {

Review comment:
   I think this test might be overkill. We haven't done anything like this 
for the other converted APIs. It's a little similar to 
`MessageTest.testRequestSchemas`, which was useful verifying the generated 
schemas when the message generator was being written. Soon `testRequestSchemas` 
will be redundant, so I guess we have to decide if we just trust the generator 
and our compatibility system tests or if we want some other canonical 
representation. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525582298



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   that is a good point





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9607: [KAFKA-10722] doc: Described the types of the stores used

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9607:
URL: https://github.com/apache/kafka/pull/9607#discussion_r525581704



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -870,6 +870,12 @@ Stateful 
transformations depend on state for processing inputs and producing outputs and 
require a state store associated with the stream processor. For 
example, in aggregating operations, a windowing state store is used to collect 
the latest aggregation results per
 window. In join operations, a windowing state store is 
used to collect all of the records received so far within the
 defined window boundary.
+   Note: Following store types are used regardless of 
the possibly specified type (via the parameter materialized):
+   
+   non-windowed aggregations and plain KTables use TimestampedKeyValueStores
+   time-windowed aggregations and kstream-kstream joins 
use TimestampedWindowStores

Review comment:
   `kstream-kstream` -> `KStream-KStream`

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -870,6 +870,12 @@ Stateful 
transformations depend on state for processing inputs and producing outputs and 
require a state store associated with the stream processor. For 
example, in aggregating operations, a windowing state store is used to collect 
the latest aggregation results per
 window. In join operations, a windowing state store is 
used to collect all of the records received so far within the
 defined window boundary.
+   Note: Following store types are used regardless of 
the possibly specified type (via the parameter materialized):
+   
+   non-windowed aggregations and plain KTables use TimestampedKeyValueStores

Review comment:
   Maybe we should say `non-windowed KTable` -- "plain" does not sound 
technical.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9606:
URL: https://github.com/apache/kafka/pull/9606#discussion_r525580343



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
##
@@ -381,7 +381,8 @@
  * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
  *
  * 
- * For failure and recovery the store will be backed by an internal 
changelog topic that will be created in Kafka.
+ * For failure and recovery the store (which always will be of type {@link 
TimestampedKeyValueStore}) will be backed by
+ * an internal changelog topic that will be created in Kafka.

Review comment:
   Should apply the same improvement to `reduce()` and `count()` overloads? 
Also for `CogroupedKStream#aggregate()`?
   
   What about `TimeWindowedKStream` and `TimeWindowedCogroupedKStream` ?
   
   Also `StreamsBuilder#table()` (and `#globalTable()`) might need an update?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
##
@@ -438,7 +439,8 @@
  * query the value of the key on a parallel running instance of your Kafka 
Streams application.
  *
  * 
- * For failure and recovery the store will be backed by an internal 
changelog topic that will be created in Kafka.
+ * For failure and recovery the store (which always will be of type {@link 
TimestampedKeyValueStore} -- regardless of what
+ * is specified in the parameter {@materialized}) will be backed by an 
internal changelog topic that will be created in Kafka.

Review comment:
   `{@materialized}` is not valid markup as far as I know. Should we 
`{@code materialized}`? (same below)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-17 Thread GitBox


hachikuji commented on pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#issuecomment-729258346


   Here are a couple additional test runs. This was on Ubuntu 20 
(ami-00831fc7c1e3ddc60). The machine type was m5a.xlarge with 200GB gp2 EBS 
storage. One instance was running the broker and one instance was running the 
producer perf test.
   
   Commands:
   ```
   bin/kafka-topics.sh --create --topic foo --replication-factor 1 --partitions 
10 --bootstrap-server $BROKER
   bin/kafka-producer-perf-test.sh --topic foo --num-records 25000 
--throughput -1  --record-size 256 --producer-props bootstrap.servers=$BROKER
   ```
   
   Here are the results:
   ```
   Patch:
   25000 records sent, 826003.925171 records/sec (201.66 MB/sec), 149.39 ms 
avg latency, 1623.00 ms max latency, 131 ms 50th, 380 ms 95th, 464 ms 99th, 650 
ms 99.9th.
   25000 records sent, 825684.740355 records/sec (201.58 MB/sec), 149.22 ms 
avg latency, 1276.00 ms max latency, 124 ms 50th, 364 ms 95th, 451 ms 99th, 775 
ms 99.9th.
   
   Trunk:
   25000 records sent, 833144.487250 records/sec (203.40 MB/sec), 148.20 ms 
avg latency, 1361.00 ms max latency, 111 ms 50th, 437 ms 95th, 551 ms 99th, 807 
ms 99.9th.
   25000 records sent, 810927.409022 records/sec (197.98 MB/sec), 152.59 ms 
avg latency, 1430.00 ms max latency, 127 ms 50th, 382 ms 95th, 467 ms 99th, 809 
ms 99.9th.
   ```
   
   Given variance in these tests, I think we're probably inline with trunk. I 
looked at the flame graph as well and did not observe any substantial 
difference in performance. Here are a few interesting highlights from one run. 
This patch is listed first with trunk second.
   
   `Sender.sendProduceRequests`:
   
   ![Screen Shot 2020-11-17 at 2 36 34 
PM](https://user-images.githubusercontent.com/12502538/99459188-89875600-28e2-11eb-93ed-1ffb46b74e63.png)
   ![Screen Shot 2020-11-17 at 2 36 49 
PM](https://user-images.githubusercontent.com/12502538/99459193-8b511980-28e2-11eb-9d19-51bbc2d7ca56.png)
   
   `KafkaChannel.write`:
   
   ![Screen Shot 2020-11-17 at 2 32 56 
PM](https://user-images.githubusercontent.com/12502538/99459229-9c018f80-28e2-11eb-9431-61a450291a72.png)
   ![Screen Shot 2020-11-17 at 2 33 17 
PM](https://user-images.githubusercontent.com/12502538/99459235-9e63e980-28e2-11eb-92fe-0a472fbee0d6.png)
   
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   Well, `getCacheSizePerThread` would eventually return zero (with growing 
number of threads), what means that every put() into the cache would result in 
an immediate eviction. So I don't think we need to do anything for this corner 
case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   Well, `getCacheSizePerThread` would eventually return zero (with growing 
number of threads), what means that every put() into the cache would result in 
an immediate eviction. So I don't think we need to do anything for this corner 
case).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   Well, `getCacheSizePerThread` would eventually return zero, what means 
that every put() into the cache would result in an immediate eviction. So I 
don't think we need to do anything for this corner case).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525540418



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   Yes, it can be zero, but the check says `< 0`, so it would always 
evaluate to false?
   
   And if we have zero threads, we should not resize the cache as we might end 
up in an infinite loop? But we would only call this method if we "shrink", ie, 
if the thread count grows, but it can never grow from negative to zero, right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525561178



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   That is a good point. Maybe what we need to do it put a minimum size of 
cache to limit how many stream threads an instance can have?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

2020-11-17 Thread GitBox


guozhangwang commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r525559599



##
File path: clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
##
@@ -40,6 +40,7 @@
  *  RecordTooLargeException
  *  UnknownServerException
  *  UnknownProducerIdException
+ *  InvalidProducerEpoch

Review comment:
   nit: `InvalidProducerEpochException`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs

2020-11-17 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10733:
---

 Summary: Enforce exception thrown for KafkaProducer txn APIs
 Key: KAFKA-10733
 URL: https://issues.apache.org/jira/browse/KAFKA-10733
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


In general, KafkaProducer could throw both fatal and non-fatal errors as 
KafkaException, which makes the exception catching hard. Furthermore, not every 
single fatal exception (checked) is marked on the function signature yet as of 
2.7.

We should have a general supporting strategy in long term for this matter, as 
whether to declare all non-fatal exceptions as wrapped KafkaException while 
extracting all fatal ones, or just add a flag to KafkaException indicating 
fatal or not, to maintain binary compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10722) Timestamped store is used even if not desired

2020-11-17 Thread fml2 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234030#comment-17234030
 ] 

fml2 commented on KAFKA-10722:
--

I've created [https://github.com/apache/kafka/pull/9606] and 
[https://github.com/apache/kafka/pull/9607].

> Timestamped store is used even if not desired
> -
>
> Key: KAFKA-10722
> URL: https://issues.apache.org/jira/browse/KAFKA-10722
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1, 2.6.0
>Reporter: fml2
>Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). 
> When aggregating, I explicitly tell to materialize the result table using a 
> usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by 
> a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a 
> timestamped one is used, which leads to a ClassCastException in the processor 
> (it iterates over the store and expects the items to be of type "KeyValue" 
> but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
> Non-Timestamped!
> .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>var store = context.getStateStore("MyStore"); // Returns a 
> TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a 
> non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It 
> lies in this line: 
> [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless of whether 
> materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] fml2 opened a new pull request #9607: [KAFKA-10722] doc: Described the types of the stores used

2020-11-17 Thread GitBox


fml2 opened a new pull request #9607:
URL: https://github.com/apache/kafka/pull/9607


   This is related to KAFKA-10722
   
   Sometimes it's important to know the correct type
   of the store used by streams. E.g. when iterating over
   its items.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525541523



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -71,6 +72,22 @@ public long flushes() {
 return numFlushes;
 }
 
+public void resize(final long newCacheSizeBytes) {
+final boolean shrink = newCacheSizeBytes < this.maxCacheSizeBytes;

Review comment:
   yep





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525540418



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   Yes, it can be zero, but the check say `< 0`, so it would always 
evaluate to false?
   
   And if we have zero threads, we should not resize the cache as we might end 
up in an infinite loop? But we would only call this method if we "shrink", ie, 
if the thread count grows, but it can never grow from negative to zero, right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9602: MINOR: Use string interpolation in FinalizedFeatureCache

2020-11-17 Thread GitBox


kowshik commented on pull request #9602:
URL: https://github.com/apache/kafka/pull/9602#issuecomment-729226900


   @dajac thanks for the review :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525536944



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   It can be zero if you have a global thread, but since this is internal 
the check might not be entirely necessary





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525534167



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -71,6 +72,22 @@ public long flushes() {
 return numFlushes;
 }
 
+public void resize(final long newCacheSizeBytes) {
+final boolean shrink = newCacheSizeBytes < this.maxCacheSizeBytes;

Review comment:
   nit: we can remove `this.` now (same next line)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525533593



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private long getCacheSizePerThread(final int numStreamThreads) {
+return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+}
+
+private void resizeThreadCache(final int numStreamThreads) {
+if (numStreamThreads < 0) {

Review comment:
   Can it be smaller than `0` ? Should the test be `<= 0` or `< 1` instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525532300



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private void resizeThreadCache(final int numStreamThreads) {
+final long cacheSizePreThread = totalCacheSize / (numStreamThreads + 
((globalTaskTopology != null) ? 1 : 0));

Review comment:
   > LGTM?
   
   If this is a question, should it be LGTY? 😂 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fml2 opened a new pull request #9606: doc: Improve JavaDoc for KGroupedStream.aggregate

2020-11-17 Thread GitBox


fml2 opened a new pull request #9606:
URL: https://github.com/apache/kafka/pull/9606


   Tell that the store used internally is always a timestamped one.
   
   This is related to KAFKA-10722.
   
   No tests are necessary because only JavaDoc was changed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

2020-11-17 Thread GitBox


kowshik commented on a change in pull request #9596:
URL: https://github.com/apache/kafka/pull/9596#discussion_r525527982



##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -83,6 +87,51 @@ class LogManagerTest {
 log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
   }
 
+  /**
+   * Tests that all internal futures are completed before 
LogManager.shutdown() returns to the
+   * caller during error situations.
+   */
+  @Test
+  def testHandlingExceptionsDuringShutdown(): Unit = {
+logManager.shutdown()
+
+// We create two directories logDir1 and logDir2 to help effectively test 
error handling
+// during LogManager.shutdown().
+val logDir1 = TestUtils.tempDir()
+val logDir2 = TestUtils.tempDir()
+logManager = createLogManager(Seq(logDir1, logDir2))
+assertEquals(2, logManager.liveLogDirs.size)
+logManager.startup()
+
+val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => 
logConfig)
+val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => 
logConfig)
+
+val logFile1 = new File(logDir1, name + "-0")
+assertTrue(logFile1.exists)
+val logFile2 = new File(logDir2, name + "-1")
+assertTrue(logFile2.exists)
+
+log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), 
leaderEpoch = 0)
+log1.takeProducerSnapshot()
+log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), 
leaderEpoch = 0)
+
+log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), 
leaderEpoch = 0)
+log2.takeProducerSnapshot()
+log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), 
leaderEpoch = 0)
+
+// This should cause log1.close() to fail during LogManger shutdown 
sequence.
+FileUtils.deleteDirectory(logFile1)

Review comment:
   It depends on the kind of error, but we do log the error information to 
the log today from within `KafkaServer.shutdown()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-11-17 Thread GitBox


abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525507232



##
File path: 
clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
##
@@ -24,4 +24,11 @@
 
 void onComplete(ClientResponse response);
 
+/**
+ * Fire when the request transmission hits a fatal exception.
+ *
+ * @param exception the thrown exception
+ */
+default void onFailure(RuntimeException exception) {

Review comment:
   We do have a case in `ConsumerNetworkClient` which adds an `onFailure` 
callback. To me it makes sense to include it as part of the 
RequestCompletionHandler interface. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525488664



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private void resizeThreadCache(final int numStreamThreads) {
+final long cacheSizePreThread = totalCacheSize / (numStreamThreads + 
((globalTaskTopology != null) ? 1 : 0));

Review comment:
   Moved to a new method. Glad we got that cleared up. LGTM?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525464495



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -71,6 +72,26 @@ public long flushes() {
 return numFlushes;
 }
 
+public void resize(final long maxCacheSizeBytes) {
+final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+this.maxCacheSizeBytes = maxCacheSizeBytes;
+if (shrink) {
+final CircularIterator circularIterator = new 
CircularIterator<>(caches.values());
+while (sizeBytes() > maxCacheSizeBytes) {
+if (!circularIterator.hasNext()) {
+log.error("Unable to remove any more entries as all caches 
are empty");

Review comment:
   Yeah, in retrospect it was not very clear. Hopefully its better this way 
now





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


cadonna commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525463645



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private void resizeThreadCache(final int numStreamThreads) {
+final long cacheSizePreThread = totalCacheSize / (numStreamThreads + 
((globalTaskTopology != null) ? 1 : 0));

Review comment:
   If this line is duplicated, it should go in a method. When I proposed to 
move it inline, I was apparently not aware that the same line was used 
somewhere else.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525463165



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -589,6 +593,10 @@ private void subscribeConsumer() {
 }
 }
 
+public void resizeCache(final long size) {
+cacheResizer.accept(size);

Review comment:
   I think we can, thats probably a good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525459352



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private void resizeThreadCache(final int numStreamThreads) {
+final long cacheSizePreThread = totalCacheSize / (numStreamThreads + 
((globalTaskTopology != null) ? 1 : 0));

Review comment:
   I think it was about readability. I might be misremembering though, as 
it was a conversation we had last week





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525444958



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread use that threw the exception, 
Thread.currentThread().

Review comment:
   need to remove `use`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525442248



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread use that threw the exception, 
Thread.currentThread().
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable) {
+if (oldHanlder) {

Review comment:
   oops





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525439819



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -71,6 +72,26 @@ public long flushes() {
 return numFlushes;
 }
 
+public void resize(final long maxCacheSizeBytes) {
+final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+this.maxCacheSizeBytes = maxCacheSizeBytes;
+if (shrink) {
+final CircularIterator circularIterator = new 
CircularIterator<>(caches.values());
+while (sizeBytes() > maxCacheSizeBytes) {
+if (!circularIterator.hasNext()) {
+log.error("Unable to remove any more entries as all caches 
are empty");

Review comment:
   I see. -- I guess the miss-leading fact was, that this check was done 
inside the while-loop.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525437647



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -589,6 +593,10 @@ private void subscribeConsumer() {
 }
 }
 
+public void resizeCache(final long size) {
+cacheResizer.accept(size);

Review comment:
   Ah. I see. -- Should we pass `java.util.function.Consumer 
cacheResizer` into `StreamThread` constructor for this case instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


cadonna commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525169791



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread use that threw the exception, 
Thread.currentThread().

Review comment:
   There is something wrong in this sentence.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread use that threw the exception, 
Thread.currentThread().
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable) {
+if (oldHanlder) {

Review comment:
   `oldHanlder` -> `oldHandler`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -559,18 +552,52 @@ void runLoop() {
 }
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
- "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+"Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
 try {
 
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
 } catch (final TaskMigratedException taskMigrated) {
 handleTaskMigrated(taskMigrated);
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final UnsupportedVersionException e) {
+final String errorMessage = e.getMessage();
+if (errorMessage != null &&
+errorMessage.startsWith("Broker unexpectedly doesn't 
support requireStable flag on version ")) {
+
+log.error("Shutting down because the Kafka cluster seems 
to be on a too old version. " +
+"Setting {}=\"{}\" requires broker version 
2.5 or higher.",
+StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
+EXACTLY_ONCE_BETA);
+

Review comment:
   nit: remove line





This is an automated message from

[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

2020-11-17 Thread GitBox


mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525435452



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private void resizeThreadCache(final int numStreamThreads) {
+final long cacheSizePreThread = totalCacheSize / (numStreamThreads + 
((globalTaskTopology != null) ? 1 : 0));

Review comment:
   Not sure why the `totalCacheSize` check is relevant for avoiding code 
duplication?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >