[GitHub] [kafka] fml2 commented on a change in pull request #9607: [KAFKA-10722] doc: Described the types of the stores used
fml2 commented on a change in pull request #9607: URL: https://github.com/apache/kafka/pull/9607#discussion_r525874521 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -870,6 +870,12 @@ Stateful transformations depend on state for processing inputs and producing outputs and require a state store associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary. + Note: Following store types are used regardless of the possibly specified type (via the parameter materialized): + + non-windowed aggregations and plain KTables use TimestampedKeyValueStores + time-windowed aggregations and kstream-kstream joins use TimestampedWindowStores Review comment: OK, will do. Why did the build fail? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fml2 commented on a change in pull request #9607: [KAFKA-10722] doc: Described the types of the stores used
fml2 commented on a change in pull request #9607: URL: https://github.com/apache/kafka/pull/9607#discussion_r525874220 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -870,6 +870,12 @@ Stateful transformations depend on state for processing inputs and producing outputs and require a state store associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary. + Note: Following store types are used regardless of the possibly specified type (via the parameter materialized): + + non-windowed aggregations and plain KTables use TimestampedKeyValueStores Review comment: OK, will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fml2 commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate
fml2 commented on a change in pull request #9606: URL: https://github.com/apache/kafka/pull/9606#discussion_r525873831 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ## @@ -438,7 +439,8 @@ * query the value of the key on a parallel running instance of your Kafka Streams application. * * - * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore} -- regardless of what + * is specified in the parameter {@materialized}) will be backed by an internal changelog topic that will be created in Kafka. Review comment: Yes, actually, I wanted to write this but somehow... I will correct it. Shoudl I then squash the commits? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fml2 commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate
fml2 commented on a change in pull request #9606: URL: https://github.com/apache/kafka/pull/9606#discussion_r525873511 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ## @@ -381,7 +381,8 @@ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. * * - * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by + * an internal changelog topic that will be created in Kafka. Review comment: Yes, this should be probably done. But since I don't know how they work and what to write there, I'd prefer to do it in another PR (or someone else should do it). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9599: MINOR: Include connector name in error message
chia7712 commented on pull request #9599: URL: https://github.com/apache/kafka/pull/9599#issuecomment-729500064 @C0urante Thanks for your patch. Merge to trunk! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9599: MINOR: Include connector name in error message
chia7712 merged pull request #9599: URL: https://github.com/apache/kafka/pull/9599 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9610: MINOR: remove "gradle wrapper" from travis.yml
chia7712 opened a new pull request #9610: URL: https://github.com/apache/kafka/pull/9610 ``` > Failed to apply plugin [class 'com.github.spotbugs.snom.SpotBugsBasePlugin'] > Gradle version Gradle 5.1.1 is unsupported. Please use Gradle 5.6 or later. ``` ```com.github.spotbugs.snom.SpotBugsBasePlugin``` requires gradle 5.6+ but the gradle supported by travis is 4.0 or 5.1.1 (https://docs.travis-ci.com/user/reference/trusty/#gradle-version and https://docs.travis-ci.com/user/reference/xenial/) However, we don't need to call ```gradle wrapper``` since ```gradlew``` already exists in our project. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on pull request #9487: URL: https://github.com/apache/kafka/pull/9487#issuecomment-729475252 Merged to trunk 🥳 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman merged pull request #9487: URL: https://github.com/apache/kafka/pull/9487 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on pull request #9487: URL: https://github.com/apache/kafka/pull/9487#issuecomment-729474257 System tests passed, all three Java builds passed. Merging now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling
kowshik commented on pull request #9596: URL: https://github.com/apache/kafka/pull/9596#issuecomment-729473539 Thanks for the review @ijuma ! I have addressed the comments in 8716429b48cad8af6ad73109c1d9f7442823c02f . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling
kowshik commented on a change in pull request #9596: URL: https://github.com/apache/kafka/pull/9596#discussion_r525850037 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File], try { for ((dir, dirJobs) <- jobs) { -dirJobs.foreach(_.get) +val hasErrors = dirJobs.exists { Review comment: Thats a really good point. Done. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File], try { for ((dir, dirJobs) <- jobs) { -dirJobs.foreach(_.get) +val hasErrors = dirJobs.exists { + future => Review comment: Done. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File], try { for ((dir, dirJobs) <- jobs) { -dirJobs.foreach(_.get) +val hasErrors = dirJobs.exists { + future => +try { Review comment: Good idea, done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10497) Convert group coordinator metadata schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10497. Fix Version/s: 2.8.0 Resolution: Fixed > Convert group coordinator metadata schemas to use generated protocol > > > Key: KAFKA-10497 > URL: https://issues.apache.org/jira/browse/KAFKA-10497 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.8.0 > > > We need to convert the internal schemas used for representing group metadata > to the generated protocol. This opens the door for flexible version support > on the next bump. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 merged pull request #9318: URL: https://github.com/apache/kafka/pull/9318 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9876) Implement Raft Protocol for Metadata Quorum
[ https://issues.apache.org/jira/browse/KAFKA-9876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234299#comment-17234299 ] feyman commented on KAFKA-9876: --- Hi, [~hachikuji], I'm interested in Raft and also KIP-595, I would like to get involved in this, is their any sub-task that I can pick up? I'm currently reading the KIP document/discussion and also the implementation to get familiar. Checked with [~bchen225242]Â offline, he mentioned that some discussion are not finalized, so I just ask here~ Thanks! > Implement Raft Protocol for Metadata Quorum > --- > > Key: KAFKA-9876 > URL: https://issues.apache.org/jira/browse/KAFKA-9876 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > This tracks the completion of the Raft Protocol specified in KIP-595: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum. > If/when the KIP is approved by the community, we will create smaller > sub-tasks to track overall prgress. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dongjinleekr commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
dongjinleekr commented on a change in pull request #9414: URL: https://github.com/apache/kafka/pull/9414#discussion_r525845117 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ## @@ -186,19 +186,29 @@ public void shouldReportDirectoryEmpty() throws IOException { @Test public void shouldThrowProcessorStateException() throws IOException { Review comment: Totally agree. :smile: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling
ijuma commented on a change in pull request #9596: URL: https://github.com/apache/kafka/pull/9596#discussion_r525835495 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File], try { for ((dir, dirJobs) <- jobs) { -dirJobs.foreach(_.get) +val hasErrors = dirJobs.exists { Review comment: This looks wrong. `exists` short-circuits. I think you want `map` followed by `exists`. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File], try { for ((dir, dirJobs) <- jobs) { -dirJobs.foreach(_.get) +val hasErrors = dirJobs.exists { + future => Review comment: Nit: this should be in the previous line. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -479,25 +479,33 @@ class LogManager(logDirs: Seq[File], try { for ((dir, dirJobs) <- jobs) { -dirJobs.foreach(_.get) +val hasErrors = dirJobs.exists { + future => +try { Review comment: You can use `scala.util.Try` to wrap the call and get a `Success` or `Failure`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
dongjinleekr commented on a change in pull request #9414: URL: https://github.com/apache/kafka/pull/9414#discussion_r525835354 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java ## @@ -140,9 +140,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(), singletonList(new KeyValueTimestamp<>("k1", "v1", 0L))); -TestUtils.waitForCondition(stateDir::exists, -"Failed awaiting CreateTopics first request failure"); Review comment: - Previous: The test asserts that the (empty) StateStore directory is not deleted. - Now: The empty StateStore directory is deleted in the cleanup process, so this assertion is no longer valid. (wait, would it much better to negate the logical condition instead of removing it?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md
[ https://issues.apache.org/jira/browse/KAFKA-10737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10737: --- Description: As MM2 is adopted by more users, new and advanced use cases requires Kafka clusters to be secure, and one way to make this happen is SSL-enabled cluster. Currently there is no clear doc on how to configure MM2 so that it can consume from or produce to a SSL-enable cluster. Some users spent quite amount of time and found out the right config (see https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to clearly doc on this. was: As MM2 is adopted by more users, new and advanced use cases requires Kafka clusters to be secure, and one way to make this happen is SSL-enabled cluster. Currently there is no clear doc on how to configure MM2 so that it can consume from or produce to a SSL-enable cluster. Some users spent quite amount of time and found out the right config (see ). So it would be great to clearly doc on this. > MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster > in README.md > -- > > Key: KAFKA-10737 > URL: https://issues.apache.org/jira/browse/KAFKA-10737 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > > As MM2 is adopted by more users, new and advanced use cases requires Kafka > clusters to be secure, and one way to make this happen is SSL-enabled cluster. > Currently there is no clear doc on how to configure MM2 so that it can > consume from or produce to a SSL-enable cluster. Some users spent quite > amount of time and found out the right config (see > https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to > clearly doc on this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md
Ning Zhang created KAFKA-10737: -- Summary: MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md Key: KAFKA-10737 URL: https://issues.apache.org/jira/browse/KAFKA-10737 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.7.0 Reporter: Ning Zhang Assignee: Ning Zhang As MM2 is adopted by more users, new and advanced use cases requires Kafka clusters to be secure, and one way to make this happen is SSL-enabled cluster. Currently there is no clear doc on how to configure MM2 so that it can consume from or produce to a SSL-enable cluster. Some users spent quite amount of time and found out the right config (see ). So it would be great to clearly doc on this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang resolved KAFKA-10704. Resolution: Resolved > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} -- This message
[jira] [Reopened] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reopened KAFKA-10704: > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} -- This me
[jira] [Updated] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10704: --- Fix Version/s: (was: 2.7.0) > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} -- This
[jira] [Commented] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234271#comment-17234271 ] Ning Zhang commented on KAFKA-10704: Indeed, we should clearly document how to produce to SSL-enabled cluster. I will create a PR against [https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md]Â to add a section about SSL-enabled cluster > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callNam
[jira] [Assigned] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-10704: -- Assignee: Ning Zhang > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: li
[jira] [Commented] (KAFKA-10728) Mirroring data without decompressing with MirrorMaker 2.0
[ https://issues.apache.org/jira/browse/KAFKA-10728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234255#comment-17234255 ] Ning Zhang commented on KAFKA-10728: my first impression is: unless the producer of MM2 is explicitly set to use `uncompressed` with [https://kafka.apache.org/documentation/#compression.type] it will use the default compression value > Mirroring data without decompressing with MirrorMaker 2.0 > - > > Key: KAFKA-10728 > URL: https://issues.apache.org/jira/browse/KAFKA-10728 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Eazhilan Nagarajan >Priority: Major > > Hello, >  > I use MirrorMaker 2.0 to copy data across two Kafka clusters and it's all > working fine. Recently we enabled compressing while producing data into any > topic which had a very positive impact on the storage and other resources but > while mirroring, the data seems to be decompressed at the target Kafka > cluster. I tried enabling compression using the below config in MM2, the data > at the target cluster is compressed now, the decompress and re-compress > continues to happen and it eats up a lot of resources unnecessarily. >  > {noformat} > - alias: my-passive-cluster > authentication: > passwordSecret: > password: password > secretName: passive-cluster-secret > type: scram-sha-512 > username: user-1 > bootstrapServers: my-passive-cluster.com:443 > config: > config.storage.replication.factor: 3 > offset.storage.replication.factor: 3 > status.storage.replication.factor: 3 > producer.compression.type: gzip{noformat} >  I found couple of Jira issues talking about it but I don't know if the > shallow iterator option is available now. > https://issues.apache.org/jira/browse/KAFKA-732, > https://issues.apache.org/jira/browse/KAFKA-845 >  > Kindly let me if this is currently available or if it'll be available in the > future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics
ableegoldman opened a new pull request #9609: URL: https://github.com/apache/kafka/pull/9609 Followup to https://github.com/apache/kafka/pull/9582 Will leave the ability to create multiple KTables from the same source topic as followup work. Similarly, creating a KStream and a KTable from the same topic can be tackled later if need be This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9604: [MINOR] adjust the log level to error
chia7712 commented on a change in pull request #9604: URL: https://github.com/apache/kafka/pull/9604#discussion_r525799307 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -477,7 +477,7 @@ class KafkaApis(val requestChannel: RequestChannel, val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => topicPartition -> status.error.exceptionName }.mkString(", ") - info( + error( Review comment: Is this message duplicate to https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L613 ? If so, is it better to remove one of them? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request
chia7712 commented on pull request #9435: URL: https://github.com/apache/kafka/pull/9435#issuecomment-729399596 @Lincong Could you take a look at those failed tests? I will give a review later :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on pull request #9487: URL: https://github.com/apache/kafka/pull/9487#issuecomment-729396557 System test run (still running but so far it's all PASS) -- https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4292/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
abbccdda commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r525764228 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ## @@ -178,6 +183,11 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } } + private def isTimedOut(response: ClientResponse): Boolean = { +val requestCreatedTime = response.receivedTimeMs() - response.requestLatencyMs() Review comment: Is it legitimate to compare with requestTimeout here since we actually measure the request buffered time on the broker-to-controller channel queue? Should we introduce a new timeout config here somehow? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
abbccdda commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r525763121 ## File path: clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java ## @@ -24,4 +24,11 @@ void onComplete(ClientResponse response); +/** + * Fire when the request transmission hits a fatal exception. + * + * @param exception the thrown exception + */ +default void onFailure(RuntimeException exception) { Review comment: I see, if this is the case, we need a customized completion handler for both forwarding and AlterISR IMHO. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH
abbccdda merged pull request #9569: URL: https://github.com/apache/kafka/pull/9569 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525734417 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} -if (globalStreamThread != null && !globalStreamThread.stillRunning()) { -try { -globalStreamThread.join(); -} catch (final InterruptedException e) { -Thread.currentThread().interrupt(); -} -globalStreamThread = null; +if (globalStreamThread != null && !globalStreamThread.stillRunning()) { +try { +globalStreamThread.join(); +} catch (final InterruptedException e) { +Thread.currentThread().interrupt(); } +globalStreamThread = null; +} -adminClient.close(); +adminClient.close(); -streamsMetrics.removeAllClientLevelMetrics(); -metrics.close(); +streamsMetrics.removeAllClientLevelMetrics(); +metrics.close(); +if (!error) { setState(State.NOT_RUNNING); -}, "kafka-streams-close-thread"); +} +}, "kafka-streams-close-thread"); +} + +private boolean close(final long timeoutMs) { +if (!setState(State.PENDING_SHUTDOWN)) { Review comment: WDYT about having both NOT_RUNNING and ERROR go through PENDING_SHUTDOWN, rather than just transitioning directly and permanently to ERROR? At a high level I think it just makes sense for ERROR and NOT_RUNNING to be symmetric. Also any benefit to having an intermediate PENDING_SHUTDOWN for the NOT_RUNNING case presumably applies to the ERROR case as well. eg, it indicates whether Streams has completed its shutdown or not: users know that an app in PENDING_SHUTDOWN should never be killed, its only safe to do so once it reaches NOT_RUNNING. We should provide the same fun
[GitHub] [kafka] showuon commented on pull request #9507: KAFKA-10628: remove all the unnecessary parameters from the tests which are using TopologyTestDriver
showuon commented on pull request #9507: URL: https://github.com/apache/kafka/pull/9507#issuecomment-729351004 @vvcephei @chia7712 , please help review. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter
showuon commented on pull request #9104: URL: https://github.com/apache/kafka/pull/9104#issuecomment-729350784 @kkonstantine , please help review. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jacky1193610322 commented on pull request #9100: Add AlterISR RPC and use it for ISR modifications
jacky1193610322 commented on pull request #9100: URL: https://github.com/apache/kafka/pull/9100#issuecomment-729349575 Thanks for your reply, I also have read KIP-500 and other KIP-631, It's good about the fence. but it will be released in a few months, before that, I think we also need to try the best to fence the broker when the controller already think the broker has died. In other words, we should fence 2-way. ` self-fence after getting an invalid version error from AlterIsr ` yes, I think we need self-fence when the session is lost, we can't rely on receiving the other machine’s response because we can't receive the response when the Broker2ControllerChannel is broken. please let me know if you create a jira. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525701691 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread that threw the exception, Thread.currentThread(). + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) { +if (oldHandler) { +if (throwable instanceof RuntimeException) { +throw (RuntimeException) throwable; +} else if (throwable instanceof Error) { +throw (Error) throwable; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); +} +} else { +handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT); +} +} + +private void handleStreamsUncaughtException(final Throwable throwable, +final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable); +if (oldHandler) { +log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + +"The old handler will be ignored as long as a new handler is set."); +} +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to " + action + "." + +" The streams client is going to shut down now. ", throwable); +close(Duration.ZERO); +break; +case SHUTDOWN_APPLICATION: +if (throwable instanceof Error) { +log.error("This option requires running threads to shut down the application." + +"but the uncaught exception was an Error, which means this runtime is no " + +"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable); +} +if (Thread.currentThread().equals(globalStreamThread) && threads.stream().noneMatch(StreamThread::isRunning)) { +log.error("Exception in global thread caused the application to attempt to shutdown." + +" This action will succeed only if there is at least one StreamThread running on this client." + +" Currently there are no running threads so will now close the client."); +close(Duration.ZERO); Review comment: That's fair. I guess I was thinking less about the inherent meaning of ERROR vs NOT_RUNNING, and mor
[jira] [Updated] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10736: --- Description: We need to convert the internal schemas used for representing transaction metadata to the generated protocol. This opens the door for flexible version support on the next bump. similar to https://issues.apache.org/jira/browse/KAFKA-10497 was: We need to convert the internal schemas used for representing transaction metadata to the generated protocol. This opens the door for flexible version support on the next bump. similar to KAFKA-10947 > Convert transaction coordinator metadata schemas to use generated protocol > -- > > Key: KAFKA-10736 > URL: https://issues.apache.org/jira/browse/KAFKA-10736 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > We need to convert the internal schemas used for representing transaction > metadata to the generated protocol. This opens the door for flexible version > support on the next bump. > similar to https://issues.apache.org/jira/browse/KAFKA-10497 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10736: --- Description: We need to convert the internal schemas used for representing transaction metadata to the generated protocol. This opens the door for flexible version support on the next bump. similar to KAFKA-10947 > Convert transaction coordinator metadata schemas to use generated protocol > -- > > Key: KAFKA-10736 > URL: https://issues.apache.org/jira/browse/KAFKA-10736 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > We need to convert the internal schemas used for representing transaction > metadata to the generated protocol. This opens the door for flexible version > support on the next bump. > similar to KAFKA-10947 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10497) Convert group/transaction coordinator metadata schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10497: --- Description: We need to convert the internal schemas used for representing group metadata to the generated protocol. This opens the door for flexible version support on the next bump. (was: We need to convert the internal schemas used for representing transaction/group metadata to the generated protocol. This opens the door for flexible version support on the next bump. ) > Convert group/transaction coordinator metadata schemas to use generated > protocol > > > Key: KAFKA-10497 > URL: https://issues.apache.org/jira/browse/KAFKA-10497 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Chia-Ping Tsai >Priority: Major > > We need to convert the internal schemas used for representing group metadata > to the generated protocol. This opens the door for flexible version support > on the next bump. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10497) Convert group coordinator metadata schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10497: --- Summary: Convert group coordinator metadata schemas to use generated protocol (was: Convert group/transaction coordinator metadata schemas to use generated protocol) > Convert group coordinator metadata schemas to use generated protocol > > > Key: KAFKA-10497 > URL: https://issues.apache.org/jira/browse/KAFKA-10497 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Chia-Ping Tsai >Priority: Major > > We need to convert the internal schemas used for representing group metadata > to the generated protocol. This opens the door for flexible version support > on the next bump. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol
Chia-Ping Tsai created KAFKA-10736: -- Summary: Convert transaction coordinator metadata schemas to use generated protocol Key: KAFKA-10736 URL: https://issues.apache.org/jira/browse/KAFKA-10736 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525692960 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} Review comment: Eh, I wouldn't bother with an AK ticket if this will be tackled in the next PR. I'll just make a list of all the minor followup work somewhere to keep track This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
chia7712 commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r525690436 ## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ## @@ -100,16 +113,92 @@ public void produceResponseRecordErrorsTest() { ProduceResponse response = new ProduceResponse(responseData); Struct struct = response.toStruct(ver); assertEquals("Should use schema version " + ver, ApiKeys.PRODUCE.responseSchema(ver), struct.schema()); -ProduceResponse.PartitionResponse deserialized = new ProduceResponse(struct).responses().get(tp); +ProduceResponse.PartitionResponse deserialized = new ProduceResponse(new ProduceResponseData(struct, ver)).responses().get(tp); if (ver >= 8) { assertEquals(1, deserialized.recordErrors.size()); assertEquals(3, deserialized.recordErrors.get(0).batchIndex); assertEquals("Record error", deserialized.recordErrors.get(0).message); assertEquals("Produce failed", deserialized.errorMessage); } else { assertEquals(0, deserialized.recordErrors.size()); -assertEquals(null, deserialized.errorMessage); +assertNull(deserialized.errorMessage); } } } + +/** + * the schema in this test is from previous code and the automatic protocol should be compatible to previous schema. + */ +@Test +public void testCompatibility() { Review comment: That makes sense to me. Will remove redundant test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525686843 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -311,6 +314,8 @@ public void run() { "Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException Review comment: like in stream thread we can just add a call to the handler This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
chia7712 commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r525683594 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse response, Maphttps://issues.apache.org/jira/browse/KAFKA-10696 Review comment: copy that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525681642 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} -if (globalStreamThread != null && !globalStreamThread.stillRunning()) { -try { -globalStreamThread.join(); -} catch (final InterruptedException e) { -Thread.currentThread().interrupt(); -} -globalStreamThread = null; +if (globalStreamThread != null && !globalStreamThread.stillRunning()) { +try { +globalStreamThread.join(); +} catch (final InterruptedException e) { +Thread.currentThread().interrupt(); } +globalStreamThread = null; +} -adminClient.close(); +adminClient.close(); -streamsMetrics.removeAllClientLevelMetrics(); -metrics.close(); +streamsMetrics.removeAllClientLevelMetrics(); +metrics.close(); +if (!error) { setState(State.NOT_RUNNING); -}, "kafka-streams-close-thread"); +} +}, "kafka-streams-close-thread"); +} + +private boolean close(final long timeoutMs) { +if (!setState(State.PENDING_SHUTDOWN)) { Review comment: This is currently the plan to remove that transition. It is pretty much the only change we plan to make to the FSM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525680874 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread that threw the exception, Thread.currentThread(). + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) { +if (oldHandler) { +if (throwable instanceof RuntimeException) { +throw (RuntimeException) throwable; +} else if (throwable instanceof Error) { +throw (Error) throwable; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); +} +} else { +handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT); +} +} + +private void handleStreamsUncaughtException(final Throwable throwable, +final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable); +if (oldHandler) { +log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + +"The old handler will be ignored as long as a new handler is set."); +} +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to " + action + "." + +" The streams client is going to shut down now. ", throwable); +close(Duration.ZERO); +break; +case SHUTDOWN_APPLICATION: +if (throwable instanceof Error) { +log.error("This option requires running threads to shut down the application." + +"but the uncaught exception was an Error, which means this runtime is no " + +"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable); +} +if (Thread.currentThread().equals(globalStreamThread) && threads.stream().noneMatch(StreamThread::isRunning)) { +log.error("Exception in global thread caused the application to attempt to shutdown." + +" This action will succeed only if there is at least one StreamThread running on this client." + +" Currently there are no running threads so will now close the client."); +close(Duration.ZERO); Review comment: I am on the fence about this. I do think its would be consistent to be not running but also it did shut
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525678234 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} Review comment: You are right I think. I just copied from the normal close method because I knew it worked. In a follow up we can maybe change both of these. Do you think that there should be a ak ticket to track it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10735) Kafka producer producing corrupted avro values when confluent cluster is recreated and producer application is not restarted
Tim Tattersall created KAFKA-10735: -- Summary: Kafka producer producing corrupted avro values when confluent cluster is recreated and producer application is not restarted Key: KAFKA-10735 URL: https://issues.apache.org/jira/browse/KAFKA-10735 Project: Kafka Issue Type: Bug Reporter: Tim Tattersall *Our Environment (AWS):* 1 x EC2 instance running 4 docker containers (using docker-compose) * cp-kafka 5.5.1 * cp-zookeeper 5.5.1 * cp-schema-registry 5.5.1 * cp-enterprise-control-center 5.5.1 1 x ECS service running a single java application with spring-kafka producer Topics are using String key and Avro value  *Problem:* * Avro values published after confluent cluster is recreated are corrupted. Expecting Avro json structure, received string value with corrupted Avro details ** Expected: {"metadata":{"nabEventVersion":"1.0","type":"Kafka IBMMQ sink connector","schemaUrl": ...*ongoing* ** Actual: 1.08Kafka IBMMQ source connector^kafka-conector-ibm-mq-source-entitlements-check\Kafka IBMMQ source connector - sourced*ongoing*  *How to Reproduce* # Using an existing confluent cluster # Start a kafka producer java application (ours running with spring-kafka) # Destroy the existing confluent cluster (using docker-compose down) # Recreate the confluent cluster (using docker-compose up) # Add the topic back onto the new cluster # Trigger a message to be produced by the running Kafka producer  *Current Workaround* * Killing running tasks on ECS service and allowing AWS to start new ones -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525663640 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -559,18 +552,51 @@ void runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); +"Will close the task as dirty and re-create and bootstrap from scratch.", e); try { taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final UnsupportedVersionException e) { +final String errorMessage = e.getMessage(); +if (errorMessage != null && +errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { + +log.error("Shutting down because the Kafka cluster seems to be on a too old version. " + Review comment: We should remember to update the wording here when we add the REPLACE_THREAD functionality This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525658639 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -311,6 +314,8 @@ public void run() { "Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException Review comment: Hm ok this might be a problem. Since this is thrown from another catch block and not from the try block, it won't be caught by the catch block below and will slip through the exception handler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525650632 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} -if (globalStreamThread != null && !globalStreamThread.stillRunning()) { -try { -globalStreamThread.join(); -} catch (final InterruptedException e) { -Thread.currentThread().interrupt(); -} -globalStreamThread = null; +if (globalStreamThread != null && !globalStreamThread.stillRunning()) { +try { +globalStreamThread.join(); +} catch (final InterruptedException e) { +Thread.currentThread().interrupt(); } +globalStreamThread = null; +} -adminClient.close(); +adminClient.close(); -streamsMetrics.removeAllClientLevelMetrics(); -metrics.close(); +streamsMetrics.removeAllClientLevelMetrics(); +metrics.close(); +if (!error) { setState(State.NOT_RUNNING); -}, "kafka-streams-close-thread"); +} +}, "kafka-streams-close-thread"); +} + +private boolean close(final long timeoutMs) { +if (!setState(State.PENDING_SHUTDOWN)) { Review comment: I just realized that this is going to be a problem with the way the ERROR state is being used. IF we `closeToError` then we transition to ERROR and shut down, however `ERROR -> PENDING_SHUTDOWN` is still an allowed transition so there's nothing to prevent the shutdown from being triggered again when a user calls `close()`. And note that a lot of users most likely have a state listener at the moment which does exactly that, ie when it sees a transition to ERROR it immediately invokes close (because that's what you should do with the current semantics) Just another th
[GitHub] [kafka] hachikuji commented on a change in pull request #9608: MINOR: Enable testLogCleanerStats
hachikuji commented on a change in pull request #9608: URL: https://github.com/apache/kafka/pull/9608#discussion_r525646356 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ## @@ -815,9 +815,10 @@ class LogCleanerTest { (0 until leo.toInt by 2).forall(!keys.contains(_))) } + @Test def testLogCleanerStats(): Unit = { -// because loadFactor is 0.75, this means we can fit 2 messages in the map -val cleaner = makeCleaner(2) +// because loadFactor is 0.75, this means we can fit 3 messages in the map Review comment: Sorry, but we have several identical comments in other test cases. Are those comments also wrong? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525640088 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} Review comment: Why do we shut down the global thread only after all stream threads have completed their shutdown? Seems like it would be more efficient to send the shutdown signal to everyone first, and then wait for all the threads to join. Can you try this out in the followup PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525636554 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread that threw the exception, Thread.currentThread(). + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) { +if (oldHandler) { +if (throwable instanceof RuntimeException) { +throw (RuntimeException) throwable; +} else if (throwable instanceof Error) { +throw (Error) throwable; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); +} +} else { +handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT); +} +} + +private void handleStreamsUncaughtException(final Throwable throwable, +final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable); +if (oldHandler) { +log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + +"The old handler will be ignored as long as a new handler is set."); +} +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to " + action + "." + +" The streams client is going to shut down now. ", throwable); +close(Duration.ZERO); +break; +case SHUTDOWN_APPLICATION: +if (throwable instanceof Error) { +log.error("This option requires running threads to shut down the application." + +"but the uncaught exception was an Error, which means this runtime is no " + +"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable); +} +if (Thread.currentThread().equals(globalStreamThread) && threads.stream().noneMatch(StreamThread::isRunning)) { +log.error("Exception in global thread caused the application to attempt to shutdown." + +" This action will succeed only if there is at least one StreamThread running on this client." + +" Currently there are no running threads so will now close the client."); +close(Duration.ZERO); Review comment: I think it makes more sense to transition to ERROR in this case than to NOT_RUNNING. But let's put t
[GitHub] [kafka] hachikuji commented on pull request #9100: Add AlterISR RPC and use it for ISR modifications
hachikuji commented on pull request #9100: URL: https://github.com/apache/kafka/pull/9100#issuecomment-729316632 @jacky1193610322 I missed this comment before. It's a good question. In general, the leader will continue in its current state as long as possible. As you say, as soon as it needs to shrink/expand the ISR, it grabs the leaderAndIsr update and attempts to synchronously update the state. If Zookeeper can't be reached, then the thread gets stuck. Eventually this causes the broker to effectively deadlock, which has the side effect of preventing any Produce requests (and any other requests) from getting through. I think it's a fair point that this affords some protection for acks=1 requests, but I think we tend to view the side effect of deadlocking the broker as worse than any benefit. In KIP-500, we have an alternative approach for self-fencing. The analogous case is when the leader cannot reach the controller. We use a heartbeating mechanism to maintain liveness in the cluster. Unlike with Zookeeper, we do not rely on the session expiration event in order to tell that a broker has been declared dead. Instead if we do not get a heartbeat response from the controller before some timeout, then we will stop accepting Produce requests. I have been thinking a little bit about your suggestion to self-fence after getting an invalid version error from AlterIsr. It might help in the interim before KIP-500 is complete. I think our expectation here was that if we get an invalid version error, then the LeaderAndIsr with the updated state should soon be on the way. I suppose we could come up with reasons why that assumption might fail, so it might make sense to be a little more defensive. I will file a jira about this and we can see what others think. Thanks for the suggestion! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 opened a new pull request #9608: MINOR: Enable testLogCleanerStats
mattwong949 opened a new pull request #9608: URL: https://github.com/apache/kafka/pull/9608 The `testLogCleanerStats` test in `LogCleanerTest.scala` was not enabled but it was implemented. This PR adds the @Test annotation, and also gives it a larger map to allow the test to pass as intended. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops
ableegoldman commented on pull request #9568: URL: https://github.com/apache/kafka/pull/9568#issuecomment-729307123 Merged to trunk. Will cherrypick back to 2.7 once the ongoing release completes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops
ableegoldman merged pull request #9568: URL: https://github.com/apache/kafka/pull/9568 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops
ableegoldman commented on pull request #9568: URL: https://github.com/apache/kafka/pull/9568#issuecomment-729306388 All tests pass, but the build overall is broken due to failure of the new `Travis CI - Pull Request` thing. I can't find anything in the results that indicate an actual problem, so I'm just going to merge this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10734) Speedup the processing of LeaderAndIsr request
Lucas Wang created KAFKA-10734: -- Summary: Speedup the processing of LeaderAndIsr request Key: KAFKA-10734 URL: https://issues.apache.org/jira/browse/KAFKA-10734 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang Consider the case where a LeaderAndIsr request contains many partitions, of which the broker is asked to become the follower. Let's call these partitions *partitionsToMakeFollower*. Further more, let's assume the cluster has n brokers and each broker is configured to have m replica fetchers (via the num.replica.fetchers config). The broker is likely to have (n-1) * m fetcher threads. Processing the LeaderAndIsr request requires 1. removing the "partitionsToMakeFollower" from all of the fetcher threads sequentially so that they won't be fetching from obsolete leaders. 2. adding the "partitionsToMakeFollower" to all of the fetcher threads sequentially 3. shutting down the idle fetcher threads sequentially (by checking the number of partitions held by each fetcher thread) On top of that, for each of the 3 operations above, the operation is handled by the request handler thread (i.e. io thread). And to complete the operation, the request handler thread needs to contend for the "partitionMapLock" with the corresponding fetcher thread. In the worst case, the request handler thread is blocked for (n-1) * m times for removing the partitions, another (n-1) * m times for adding the partitions, and yet another (n-1) * m times for shutting down the idle fetcher threads. Overall, all of the blocking can result in a significant delay in processing the LeaderAndIsr request. The further implication is that if the follower delays its fetching from the leader, there could be under MinISR partitions in the cluster, causing unavailability for clients. This ticket is created to track speedup in the processing of the LeaderAndIsr request. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage
hachikuji commented on a change in pull request #9539: URL: https://github.com/apache/kafka/pull/9539#discussion_r525603946 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -394,7 +394,7 @@ public void transitionToLeader(long epochStartOffset) throws IOException { localId, epoch(), epochStartOffset, -voters +voters, candidateState.grantingVoters() Review comment: nit: can we move to next line to follow the convention here? ## File path: clients/src/main/resources/common/message/LeaderChangeMessage.json ## @@ -22,7 +22,9 @@ {"name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the newly elected leader"}, {"name": "Voters", "type": "[]Voter", "versions": "0+", - "about": "The voters who voted for the current leader"} + "about": "The voters who voted at the time of election"}, +{"name": "EndorsingVoters", "type": "[]Voter", "versions": "0+", Review comment: Could we use "Granting" instead of "Endorsing"? I think this is a more consistent term considering `LeaderState`. ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -709,6 +709,9 @@ static void verifyLeaderChangeMessage( assertEquals(leaderId, leaderChangeMessage.leaderId()); assertEquals(voters.stream().map(voterId -> new Voter().setVoterId(voterId)).collect(Collectors.toList()), leaderChangeMessage.voters()); +assertEquals(voters.stream().map(voterId -> new Voter().setVoterId(voterId)).collect(Collectors.toSet()), Review comment: Hmm.. It's curious that we can always rely on the full voter set. Do we not have any test cases where we have not received votes from all members? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
hachikuji commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r525598620 ## File path: clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java ## @@ -24,4 +24,11 @@ void onComplete(ClientResponse response); +/** + * Fire when the request transmission hits a fatal exception. + * + * @param exception the thrown exception + */ +default void onFailure(RuntimeException exception) { Review comment: Here is the interface for `KafkaClient`: ``` ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder requestBuilder, long createdTimeMs, boolean expectResponse, int requestTimeoutMs, RequestCompletionHandler callback); ``` It is misleading to add an `onFailure` callback to `RequestCompletionHandler` if it is not going to be used by `KafkaClient` implementations such as `NetworkClient`. The usage in `ConsumerNetworkClient` is different because it is internal. In general, we should avoid leaking implementation details up to the interfaces. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
hachikuji commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r525586697 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse response, Maphttps://issues.apache.org/jira/browse/KAFKA-10696 Review comment: nit: since we have the jira for tracking, can we remove the TODO? A few more of these in the PR. ## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ## @@ -100,16 +113,92 @@ public void produceResponseRecordErrorsTest() { ProduceResponse response = new ProduceResponse(responseData); Struct struct = response.toStruct(ver); assertEquals("Should use schema version " + ver, ApiKeys.PRODUCE.responseSchema(ver), struct.schema()); -ProduceResponse.PartitionResponse deserialized = new ProduceResponse(struct).responses().get(tp); +ProduceResponse.PartitionResponse deserialized = new ProduceResponse(new ProduceResponseData(struct, ver)).responses().get(tp); if (ver >= 8) { assertEquals(1, deserialized.recordErrors.size()); assertEquals(3, deserialized.recordErrors.get(0).batchIndex); assertEquals("Record error", deserialized.recordErrors.get(0).message); assertEquals("Produce failed", deserialized.errorMessage); } else { assertEquals(0, deserialized.recordErrors.size()); -assertEquals(null, deserialized.errorMessage); +assertNull(deserialized.errorMessage); } } } + +/** + * the schema in this test is from previous code and the automatic protocol should be compatible to previous schema. + */ +@Test +public void testCompatibility() { Review comment: I think this test might be overkill. We haven't done anything like this for the other converted APIs. It's a little similar to `MessageTest.testRequestSchemas`, which was useful verifying the generated schemas when the message generator was being written. Soon `testRequestSchemas` will be redundant, so I guess we have to decide if we just trust the generator and our compatibility system tests or if we want some other canonical representation. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525582298 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: that is a good point This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9607: [KAFKA-10722] doc: Described the types of the stores used
mjsax commented on a change in pull request #9607: URL: https://github.com/apache/kafka/pull/9607#discussion_r525581704 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -870,6 +870,12 @@ Stateful transformations depend on state for processing inputs and producing outputs and require a state store associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary. + Note: Following store types are used regardless of the possibly specified type (via the parameter materialized): + + non-windowed aggregations and plain KTables use TimestampedKeyValueStores + time-windowed aggregations and kstream-kstream joins use TimestampedWindowStores Review comment: `kstream-kstream` -> `KStream-KStream` ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -870,6 +870,12 @@ Stateful transformations depend on state for processing inputs and producing outputs and require a state store associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary. + Note: Following store types are used regardless of the possibly specified type (via the parameter materialized): + + non-windowed aggregations and plain KTables use TimestampedKeyValueStores Review comment: Maybe we should say `non-windowed KTable` -- "plain" does not sound technical. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate
mjsax commented on a change in pull request #9606: URL: https://github.com/apache/kafka/pull/9606#discussion_r525580343 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ## @@ -381,7 +381,8 @@ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. * * - * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by + * an internal changelog topic that will be created in Kafka. Review comment: Should apply the same improvement to `reduce()` and `count()` overloads? Also for `CogroupedKStream#aggregate()`? What about `TimeWindowedKStream` and `TimeWindowedCogroupedKStream` ? Also `StreamsBuilder#table()` (and `#globalTable()`) might need an update? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ## @@ -438,7 +439,8 @@ * query the value of the key on a parallel running instance of your Kafka Streams application. * * - * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore} -- regardless of what + * is specified in the parameter {@materialized}) will be backed by an internal changelog topic that will be created in Kafka. Review comment: `{@materialized}` is not valid markup as far as I know. Should we `{@code materialized}`? (same below) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
hachikuji commented on pull request #9401: URL: https://github.com/apache/kafka/pull/9401#issuecomment-729258346 Here are a couple additional test runs. This was on Ubuntu 20 (ami-00831fc7c1e3ddc60). The machine type was m5a.xlarge with 200GB gp2 EBS storage. One instance was running the broker and one instance was running the producer perf test. Commands: ``` bin/kafka-topics.sh --create --topic foo --replication-factor 1 --partitions 10 --bootstrap-server $BROKER bin/kafka-producer-perf-test.sh --topic foo --num-records 25000 --throughput -1 --record-size 256 --producer-props bootstrap.servers=$BROKER ``` Here are the results: ``` Patch: 25000 records sent, 826003.925171 records/sec (201.66 MB/sec), 149.39 ms avg latency, 1623.00 ms max latency, 131 ms 50th, 380 ms 95th, 464 ms 99th, 650 ms 99.9th. 25000 records sent, 825684.740355 records/sec (201.58 MB/sec), 149.22 ms avg latency, 1276.00 ms max latency, 124 ms 50th, 364 ms 95th, 451 ms 99th, 775 ms 99.9th. Trunk: 25000 records sent, 833144.487250 records/sec (203.40 MB/sec), 148.20 ms avg latency, 1361.00 ms max latency, 111 ms 50th, 437 ms 95th, 551 ms 99th, 807 ms 99.9th. 25000 records sent, 810927.409022 records/sec (197.98 MB/sec), 152.59 ms avg latency, 1430.00 ms max latency, 127 ms 50th, 382 ms 95th, 467 ms 99th, 809 ms 99.9th. ``` Given variance in these tests, I think we're probably inline with trunk. I looked at the flame graph as well and did not observe any substantial difference in performance. Here are a few interesting highlights from one run. This patch is listed first with trunk second. `Sender.sendProduceRequests`: ![Screen Shot 2020-11-17 at 2 36 34 PM](https://user-images.githubusercontent.com/12502538/99459188-89875600-28e2-11eb-93ed-1ffb46b74e63.png) ![Screen Shot 2020-11-17 at 2 36 49 PM](https://user-images.githubusercontent.com/12502538/99459193-8b511980-28e2-11eb-9d19-51bbc2d7ca56.png) `KafkaChannel.write`: ![Screen Shot 2020-11-17 at 2 32 56 PM](https://user-images.githubusercontent.com/12502538/99459229-9c018f80-28e2-11eb-9431-61a450291a72.png) ![Screen Shot 2020-11-17 at 2 33 17 PM](https://user-images.githubusercontent.com/12502538/99459235-9e63e980-28e2-11eb-92fe-0a472fbee0d6.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: Well, `getCacheSizePerThread` would eventually return zero (with growing number of threads), what means that every put() into the cache would result in an immediate eviction. So I don't think we need to do anything for this corner case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: Well, `getCacheSizePerThread` would eventually return zero (with growing number of threads), what means that every put() into the cache would result in an immediate eviction. So I don't think we need to do anything for this corner case). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: Well, `getCacheSizePerThread` would eventually return zero, what means that every put() into the cache would result in an immediate eviction. So I don't think we need to do anything for this corner case). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525540418 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: Yes, it can be zero, but the check says `< 0`, so it would always evaluate to false? And if we have zero threads, we should not resize the cache as we might end up in an infinite loop? But we would only call this method if we "shrink", ie, if the thread count grows, but it can never grow from negative to zero, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525561178 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: That is a good point. Maybe what we need to do it put a minimum size of cache to limit how many stream threads an instance can have? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH
guozhangwang commented on a change in pull request #9569: URL: https://github.com/apache/kafka/pull/9569#discussion_r525559599 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/Callback.java ## @@ -40,6 +40,7 @@ * RecordTooLargeException * UnknownServerException * UnknownProducerIdException + * InvalidProducerEpoch Review comment: nit: `InvalidProducerEpochException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs
Boyang Chen created KAFKA-10733: --- Summary: Enforce exception thrown for KafkaProducer txn APIs Key: KAFKA-10733 URL: https://issues.apache.org/jira/browse/KAFKA-10733 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen In general, KafkaProducer could throw both fatal and non-fatal errors as KafkaException, which makes the exception catching hard. Furthermore, not every single fatal exception (checked) is marked on the function signature yet as of 2.7. We should have a general supporting strategy in long term for this matter, as whether to declare all non-fatal exceptions as wrapped KafkaException while extracting all fatal ones, or just add a flag to KafkaException indicating fatal or not, to maintain binary compatibility. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10722) Timestamped store is used even if not desired
[ https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234030#comment-17234030 ] fml2 commented on KAFKA-10722: -- I've created [https://github.com/apache/kafka/pull/9606] and [https://github.com/apache/kafka/pull/9607]. > Timestamped store is used even if not desired > - > > Key: KAFKA-10722 > URL: https://issues.apache.org/jira/browse/KAFKA-10722 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1, 2.6.0 >Reporter: fml2 >Priority: Major > > I have a stream which I then group and aggregate (this results in a KTable). > When aggregating, I explicitly tell to materialize the result table using a > usual (not timestamped) store. > After that, the KTable is filtered and streamed. This stream is processed by > a processor that accesses the store. > The problem/bug is that even if I tell to use a non-timestamped store, a > timestamped one is used, which leads to a ClassCastException in the processor > (it iterates over the store and expects the items to be of type "KeyValue" > but they are of type "ValueAndTimestamp"). > Here is the code (schematically). > First, I define the topology: > {code:java} > KTable table = ...aggregate( > initializer, // initializer for the KTable row > aggregator, // aggregator > Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- > Non-Timestamped! > .withKeySerde(...).withValueSerde(...)); > table.toStream().process(theProcessor); > {code} > In the class for the processor: > {code:java} > public void init(ProcessorContext context) { >var store = context.getStateStore("MyStore"); // Returns a > TimestampedKeyValueStore! > } > {code} > A timestamped store is returned even if I explicitly told to use a > non-timestamped one! > Â > I tried to find the cause for this behaviour and think that I've found it. It > lies in this line: > [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241] > There, TimestampedKeyValueStoreMaterializer is used regardless of whether > materialization supplier is a timestamped one or not. > I think this is a bug. > Â -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] fml2 opened a new pull request #9607: [KAFKA-10722] doc: Described the types of the stores used
fml2 opened a new pull request #9607: URL: https://github.com/apache/kafka/pull/9607 This is related to KAFKA-10722 Sometimes it's important to know the correct type of the store used by streams. E.g. when iterating over its items. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525541523 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -71,6 +72,22 @@ public long flushes() { return numFlushes; } +public void resize(final long newCacheSizeBytes) { +final boolean shrink = newCacheSizeBytes < this.maxCacheSizeBytes; Review comment: yep This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525540418 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: Yes, it can be zero, but the check say `< 0`, so it would always evaluate to false? And if we have zero threads, we should not resize the cache as we might end up in an infinite loop? But we would only call this method if we "shrink", ie, if the thread count grows, but it can never grow from negative to zero, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9602: MINOR: Use string interpolation in FinalizedFeatureCache
kowshik commented on pull request #9602: URL: https://github.com/apache/kafka/pull/9602#issuecomment-729226900 @dajac thanks for the review :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525536944 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: It can be zero if you have a global thread, but since this is internal the check might not be entirely necessary This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525534167 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -71,6 +72,22 @@ public long flushes() { return numFlushes; } +public void resize(final long newCacheSizeBytes) { +final boolean shrink = newCacheSizeBytes < this.maxCacheSizeBytes; Review comment: nit: we can remove `this.` now (same next line) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525533593 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private long getCacheSizePerThread(final int numStreamThreads) { +return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +} + +private void resizeThreadCache(final int numStreamThreads) { +if (numStreamThreads < 0) { Review comment: Can it be smaller than `0` ? Should the test be `<= 0` or `< 1` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525532300 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private void resizeThreadCache(final int numStreamThreads) { +final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); Review comment: > LGTM? If this is a question, should it be LGTY? 😂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fml2 opened a new pull request #9606: doc: Improve JavaDoc for KGroupedStream.aggregate
fml2 opened a new pull request #9606: URL: https://github.com/apache/kafka/pull/9606 Tell that the store used internally is always a timestamped one. This is related to KAFKA-10722. No tests are necessary because only JavaDoc was changed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling
kowshik commented on a change in pull request #9596: URL: https://github.com/apache/kafka/pull/9596#discussion_r525527982 ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -83,6 +87,51 @@ class LogManagerTest { log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) } + /** + * Tests that all internal futures are completed before LogManager.shutdown() returns to the + * caller during error situations. + */ + @Test + def testHandlingExceptionsDuringShutdown(): Unit = { +logManager.shutdown() + +// We create two directories logDir1 and logDir2 to help effectively test error handling +// during LogManager.shutdown(). +val logDir1 = TestUtils.tempDir() +val logDir2 = TestUtils.tempDir() +logManager = createLogManager(Seq(logDir1, logDir2)) +assertEquals(2, logManager.liveLogDirs.size) +logManager.startup() + +val log1 = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig) +val log2 = logManager.getOrCreateLog(new TopicPartition(name, 1), () => logConfig) + +val logFile1 = new File(logDir1, name + "-0") +assertTrue(logFile1.exists) +val logFile2 = new File(logDir2, name + "-1") +assertTrue(logFile2.exists) + +log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) +log1.takeProducerSnapshot() +log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0) + +log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0) +log2.takeProducerSnapshot() +log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0) + +// This should cause log1.close() to fail during LogManger shutdown sequence. +FileUtils.deleteDirectory(logFile1) Review comment: It depends on the kind of error, but we do log the error information to the log today from within `KafkaServer.shutdown()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
abbccdda commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r525507232 ## File path: clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java ## @@ -24,4 +24,11 @@ void onComplete(ClientResponse response); +/** + * Fire when the request transmission hits a fatal exception. + * + * @param exception the thrown exception + */ +default void onFailure(RuntimeException exception) { Review comment: We do have a case in `ConsumerNetworkClient` which adds an `onFailure` callback. To me it makes sense to include it as part of the RequestCompletionHandler interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525488664 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private void resizeThreadCache(final int numStreamThreads) { +final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); Review comment: Moved to a new method. Glad we got that cleared up. LGTM? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525464495 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -71,6 +72,26 @@ public long flushes() { return numFlushes; } +public void resize(final long maxCacheSizeBytes) { +final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes; +this.maxCacheSizeBytes = maxCacheSizeBytes; +if (shrink) { +final CircularIterator circularIterator = new CircularIterator<>(caches.values()); +while (sizeBytes() > maxCacheSizeBytes) { +if (!circularIterator.hasNext()) { +log.error("Unable to remove any more entries as all caches are empty"); Review comment: Yeah, in retrospect it was not very clear. Hopefully its better this way now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
cadonna commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525463645 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private void resizeThreadCache(final int numStreamThreads) { +final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); Review comment: If this line is duplicated, it should go in a method. When I proposed to move it inline, I was apparently not aware that the same line was used somewhere else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525463165 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -589,6 +593,10 @@ private void subscribeConsumer() { } } +public void resizeCache(final long size) { +cacheResizer.accept(size); Review comment: I think we can, thats probably a good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
wcarlson5 commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525459352 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private void resizeThreadCache(final int numStreamThreads) { +final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); Review comment: I think it was about readability. I might be misremembering though, as it was a conversation we had last week This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525444958 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread use that threw the exception, Thread.currentThread(). Review comment: need to remove `use` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525442248 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread use that threw the exception, Thread.currentThread(). + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) { +if (oldHanlder) { Review comment: oops This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525439819 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -71,6 +72,26 @@ public long flushes() { return numFlushes; } +public void resize(final long maxCacheSizeBytes) { +final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes; +this.maxCacheSizeBytes = maxCacheSizeBytes; +if (shrink) { +final CircularIterator circularIterator = new CircularIterator<>(caches.values()); +while (sizeBytes() > maxCacheSizeBytes) { +if (!circularIterator.hasNext()) { +log.error("Unable to remove any more entries as all caches are empty"); Review comment: I see. -- I guess the miss-leading fact was, that this check was done inside the while-loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525437647 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -589,6 +593,10 @@ private void subscribeConsumer() { } } +public void resizeCache(final long size) { +cacheResizer.accept(size); Review comment: Ah. I see. -- Should we pass `java.util.function.Consumer cacheResizer` into `StreamThread` constructor for this case instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
cadonna commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525169791 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread use that threw the exception, Thread.currentThread(). Review comment: There is something wrong in this sentence. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread use that threw the exception, Thread.currentThread(). + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) { +if (oldHanlder) { Review comment: `oldHanlder` -> `oldHandler` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -559,18 +552,52 @@ void runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); +"Will close the task as dirty and re-create and bootstrap from scratch.", e); try { taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final UnsupportedVersionException e) { +final String errorMessage = e.getMessage(); +if (errorMessage != null && +errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { + +log.error("Shutting down because the Kafka cluster seems to be on a too old version. " + +"Setting {}=\"{}\" requires broker version 2.5 or higher.", +StreamsConfig.PROCESSING_GUARANTEE_CONFIG, +EXACTLY_ONCE_BETA); + Review comment: nit: remove line This is an automated message from
[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes
mjsax commented on a change in pull request #9572: URL: https://github.com/apache/kafka/pull/9572#discussion_r525435452 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private void resizeThreadCache(final int numStreamThreads) { +final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); Review comment: Not sure why the `totalCacheSize` check is relevant for avoiding code duplication? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org