[jira] [Commented] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
[ https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603914#comment-17603914 ] Bruno Cadonna commented on KAFKA-8115: -- Failed again on JDK17: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12600/7/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated___2/ > Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated > --- > > Key: KAFKA-8115 > URL: https://issues.apache.org/jira/browse/KAFKA-8115 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/] > {quote}org.junit.runners.model.TestTimedOutException: test timed out after > 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native > Method) at > java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234) > at > java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123) > at > java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454) > at > java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709) > at > app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157) > at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) > at > app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285) > at > app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596) > at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote} > STDOUT > {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with > agents: node02 and coordinator: node01 > (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 > 09:23:41,595] INFO Logging initialized @13340ms to > org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) > [2019-03-15 09:23:41,752] INFO Starting REST server > (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] > INFO Registered resource > org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb > (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] > INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: > c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS > (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO > DefaultSessionIdManager workerName=node0 > (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No > SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) > [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms > (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started > o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} > (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 > 09:23:44,473] INFO Started > ServerConnector@79a93bf1\{HTTP/1.1,[http/1.1]}{0.0.0.0:33477} > (org.eclipse.jetty.server.AbstractConnector:292) [2019-03-15 09:23:44,474] > INFO Started @16219ms (org.eclipse.jetty.server.Server:407) [2019-03-15 > 09:23:44,475] INFO REST server listening at [http://127.0.1.1:33477/] > (org
[jira] [Comment Edited] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
[ https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603914#comment-17603914 ] Bruno Cadonna edited comment on KAFKA-8115 at 9/14/22 7:01 AM: --- Failed again with timeout on JDK17: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12600/7/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated___2/ was (Author: cadonna): Failed again on JDK17: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12600/7/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated___2/ > Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated > --- > > Key: KAFKA-8115 > URL: https://issues.apache.org/jira/browse/KAFKA-8115 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/] > {quote}org.junit.runners.model.TestTimedOutException: test timed out after > 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native > Method) at > java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234) > at > java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123) > at > java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454) > at > java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709) > at > app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157) > at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) > at > app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285) > at > app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596) > at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote} > STDOUT > {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with > agents: node02 and coordinator: node01 > (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 > 09:23:41,595] INFO Logging initialized @13340ms to > org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) > [2019-03-15 09:23:41,752] INFO Starting REST server > (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] > INFO Registered resource > org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb > (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] > INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: > c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS > (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO > DefaultSessionIdManager workerName=node0 > (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No > SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) > [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms > (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started > o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} > (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 >
[GitHub] [kafka] cadonna commented on pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
cadonna commented on PR #12600: URL: https://github.com/apache/kafka/pull/12600#issuecomment-1246332926 Build failures are unrelated: ``` Build / JDK 8 and Scala 2.12 / kafka.test.ClusterTestExtensionsTest.[1] Type=ZK, Name=Generated Test, MetadataVersion=3.3-IV3, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
cadonna merged PR #12600: URL: https://github.com/apache/kafka/pull/12600 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool
nizhikov commented on code in PR #12632: URL: https://github.com/apache/kafka/pull/12632#discussion_r970457373 ## core/src/main/scala/kafka/tools/StreamsResetter.java: ## @@ -213,8 +217,12 @@ private void parseArguments(final String[] args) { .ofType(String.class) .describedAs("id") .required(); -bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2") -.withRequiredArg() +bootstrapServerOption = optionParser.accepts("bootstrap-server", "The server(s) to use for bootstrapping.") Review Comment: Thanks for a hint. Apply code you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool
nizhikov commented on code in PR #12632: URL: https://github.com/apache/kafka/pull/12632#discussion_r970460345 ## docs/streams/developer-guide/app-reset-tool.html: ## @@ -84,9 +84,11 @@ Step 1: Run the application reset tool
[GitHub] [kafka] nizhikov commented on pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool
nizhikov commented on PR #12632: URL: https://github.com/apache/kafka/pull/12632#issuecomment-1246427376 @C0urante Thanks for the review. Fixed all your comments. > Should we also update the system test I doubt it. a. We have compatibility tests (which not use streams resetter at the moment). see - [streams_broker_compatibility_test.py](https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py#L82). So it's possible then in future we will try to use `kafka-streams-application-reset.sh` for previously released Kafka versions which knows nothing about new parameter. b. At the moment I don't have test environment to check changes in system tests :). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kumarpritam863 closed pull request #12622: Update WorkerSinkTask.java
kumarpritam863 closed pull request #12622: Update WorkerSinkTask.java URL: https://github.com/apache/kafka/pull/12622 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Component/s: (was: KafkaConnect) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220 ] Pritam Kumar deleted comment on KAFKA-14220: -- was (Author: JIRAUSER295638): https://github.com/apache/kafka/pull/12622 > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > Labels: pull-request-available > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220 ] Pritam Kumar deleted comment on KAFKA-14220: -- was (Author: JIRAUSER295638): The following is the patch made and verified: *https://github.com/apache/kafka/pull/12622* > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > Labels: pull-request-available > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Labels: (was: pull-request-available) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Flags: (was: Patch,Important) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar closed KAFKA-14220. > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar resolved KAFKA-14220. -- Reviewer: (was: Chris Egerton) Resolution: Abandoned > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Description: (was: Issue: In case of the revocation of partitions, the updation of "partition count" metrics is being done before updating the new set of assignments. "invokePartitionsRevoked" method of "onJoinComplete" function of "ConsumerCoordinator" class is being called before the " subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As a result of which the old assigned partition count is getting updated again and again even after future rebalances. Demo: Suppose the current assignment is like: Assigned partitions: [partition-1, partition-2] Current owned partitions: [] Added partitions (assigned - owned): [partition-1, partition-2] Revoked partitions (owned - assigned): [] After that some other worker joined and part of that, as a result of which “partition-2” has to be revoked. Assigned partitions: [partition-1] Current owned partitions: [partition-1, partition-2] Added partitions (assigned - owned): [] Revoked partitions (owned - assigned): [partition-2] But as the "assignment" need to be updated with these new assignment via the following logic: [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] Line 463 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||subscriptions.assignFromSubscribed(assignedPartitions);| But before this only "{*}updatePartitionCount{*}()" is getting called via "{*}invokePartitionsRevoked{*}": [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] Line 443 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));| Due to this when it is going to call for the "{*}assignedPartitions{*}" of "{*}consumer{*}" via the following: [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] Line 892 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||public Set assignment() {| the "{*}assignedPartitions{*}" is not yet updated. Solution: As part of the bug fix to KAFKA-12622 introducing code changes to update the partition count metrics after the the newly assigned partition are registered.) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] naanagon commented on pull request #11516: MINOR: Use MessageDigest equals when comparing signature
naanagon commented on PR #11516: URL: https://github.com/apache/kafka/pull/11516#issuecomment-1246533634 @divijvaidya Thank you for your suggestion. Noted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #12638: Register and unregister changelog topics in state updater
cadonna opened a new pull request, #12638: URL: https://github.com/apache/kafka/pull/12638 Registering and unregistering the changelog topics in the changelog reader outside of the state updater leads to race conditions between the stream thread and the state updater thread. Thus, this PR moves registering and unregistering of changelog topics in the changelog reader into the state updater if the state updater is enabled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on PR #12638: URL: https://github.com/apache/kafka/pull/12638#issuecomment-1246558345 Call for review: @wcarlson5 @lihaosky -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970625278 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -98,7 +99,7 @@ public void run() { while (isRunning.get()) { try { runOnce(); -} catch (final InterruptedException interruptedException) { +} catch (final InterruptedException | InterruptException interruptedException) { Review Comment: The restore consumer might throw a `InterruptException` when the state updater is shutdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970626576 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -262,19 +263,19 @@ private List getTasksAndActions() { private void addTask(final Task task) { if (isStateless(task)) { addToRestoredTasks((StreamTask) task); -log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); +log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); Review Comment: I changed a couple of log messages from debug to info to better track tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970627977 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -406,17 +412,21 @@ private void maybeCheckpointUpdatingTasks(final long now) { private StateUpdaterThread stateUpdaterThread = null; private CountDownLatch shutdownGate; -public DefaultStateUpdater(final StreamsConfig config, +private String name; + +public DefaultStateUpdater(final String name, Review Comment: Added a name for the state updater to improve log messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970629341 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -575,8 +584,10 @@ public void close() throws ProcessorStateException { void recycle() { log.debug("Recycling state for {} task {}.", taskType, taskId); -final List allChangelogs = getAllChangelogTopicPartitions(); -changelogReader.unregister(allChangelogs); +if (!stateUpdaterEnabled) { +final List allChangelogs = getAllChangelogTopicPartitions(); +changelogReader.unregister(allChangelogs); +} Review Comment: Note that once we have only the state updater recycling a state manager becomes a noop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970630607 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -536,7 +543,9 @@ public void flushCache() { public void close() throws ProcessorStateException { log.debug("Closing its state manager and all the registered state stores: {}", stores); -changelogReader.unregister(getAllChangelogTopicPartitions()); +if (!stateUpdaterEnabled) { +changelogReader.unregister(getAllChangelogTopicPartitions()); +} Review Comment: Only unregister changelogs if the state updater is disabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970630206 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -209,7 +212,9 @@ void registerStateStores(final List allStores, final InternalProcess processorContext.uninitialize(); for (final StateStore store : allStores) { if (stores.containsKey(store.name())) { -maybeRegisterStoreWithChangelogReader(store.name()); +if (!stateUpdaterEnabled) { +maybeRegisterStoreWithChangelogReader(store.name()); +} Review Comment: Only register changelogs if the state updater is disabled. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -352,7 +357,9 @@ public void registerStore(final StateStore store, // on the state manager this state store would be closed as well stores.put(storeName, storeMetadata); -maybeRegisterStoreWithChangelogReader(storeName); +if (!stateUpdaterEnabled) { +maybeRegisterStoreWithChangelogReader(storeName); +} Review Comment: Only register changelogs if the state updater is disabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970631567 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java: ## @@ -31,6 +33,8 @@ public interface ChangelogRegister { */ void register(final TopicPartition partition, final ProcessorStateManager stateManager); +void register(final Set partition, final ProcessorStateManager stateManager); Review Comment: This is not strictly needed, but I thought it makes registering and unregistering a bit more symmetric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
cadonna commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r970637017 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -169,7 +169,7 @@ public void addPendingActiveTaskToSuspend(final TaskId taskId) { private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) { final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId); -return !(pendingUpdateAction == null || pendingUpdateAction.getAction() != action); +return pendingUpdateAction != null && pendingUpdateAction.getAction() == action; Review Comment: Follow up from https://github.com/apache/kafka/pull/12600#discussion_r970078065 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
cadonna commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r970637424 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -103,42 +101,75 @@ public void addPendingStandbyTasksToCreate(final Map @Override public Set removePendingTaskToRecycle(final TaskId taskId) { -return pendingTasksToRecycle.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.RECYCLE)) { +return pendingUpdateActions.remove(taskId).getInputPartitions(); +} +return null; } @Override public void addPendingTaskToRecycle(final TaskId taskId, final Set inputPartitions) { -pendingTasksToRecycle.put(taskId, inputPartitions); +pendingUpdateActions.put(taskId, PendingUpdateAction.createRecycleTask(inputPartitions)); } @Override public Set removePendingTaskToUpdateInputPartitions(final TaskId taskId) { -return pendingTasksToUpdateInputPartitions.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) { +return pendingUpdateActions.remove(taskId).getInputPartitions(); +} +return null; } @Override public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set inputPartitions) { -pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions); +pendingUpdateActions.put(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions)); } @Override public boolean removePendingTaskToCloseDirty(final TaskId taskId) { -return pendingTasksToCloseDirty.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.CLOSE_DIRTY)) { +pendingUpdateActions.remove(taskId); +return true; +} +return false; } @Override public void addPendingTaskToCloseDirty(final TaskId taskId) { -pendingTasksToCloseDirty.add(taskId); +pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseDirty()); } @Override public boolean removePendingTaskToCloseClean(final TaskId taskId) { -return pendingTasksToCloseClean.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) { +pendingUpdateActions.remove(taskId); +return true; +} +return false; } @Override public void addPendingTaskToCloseClean(final TaskId taskId) { -pendingTasksToCloseClean.add(taskId); +pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseClean()); +} + +@Override +public boolean removePendingActiveTaskToSuspend(final TaskId taskId) { +if (containsTaskIdWithAction(taskId, Action.SUSPEND)) { +pendingUpdateActions.remove(taskId); +return true; +} +return false; +} + +@Override +public void addPendingActiveTaskToSuspend(final TaskId taskId) { +pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend()); +} + +private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) { +final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId); +return !(pendingUpdateAction == null || pendingUpdateAction.getAction() != action); Review Comment: Done in https://github.com/apache/kafka/pull/12638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604021#comment-17604021 ] Nicholas Telford commented on KAFKA-10635: -- [~guozhang] both my Kafka brokers and all clients run 3.2.0. The original issue reporter is running 2.5.1 > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutdown (at restart) is clean or unclean. However, when we > rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling > restarts, we don't see this error on the streams application at all. This is > blocking us from upgrading our broker version. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
cadonna commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r970668525 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { +private void handleTasksWithStateUpdater(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseClean) { +handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate); +} + +private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate, +final Map> standbyTasksToCreate, +final Map> tasksToRecycle, +final Set tasksToCloseClean) { for (final Task task : tasks.allTasks()) { +if (!task.isActive()) { +throw new IllegalStateException("Standby tasks should only be managed by the state updater"); +} final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { -if (task.isActive()) { -final Set topicPartitions = activeTasksToCreate.get(taskId); -if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { -task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); -} -task.resume(); -} else { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} +handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { -if (!task.isActive()) { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} else { -tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); -} +tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } -private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { -classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +private void handleReAssignedActiveTask(final Task task, +final Set inputPartitions) { +if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +} +if (task.state() == State.SUSPENDED) { +task.resume(); +moveTaskFromTasksRegistryToStateUpdater(task); +} Review Comment: Could you elaborate why a suspended task cannot be reassigned as active with the cooperative assignor? Is this guaranteed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11516: MINOR: Use MessageDigest equals when comparing signature
showuon commented on PR #11516: URL: https://github.com/apache/kafka/pull/11516#issuecomment-1246713341 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerNewToOldIBP() Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11516: MINOR: Use MessageDigest equals when comparing signature
showuon merged PR #11516: URL: https://github.com/apache/kafka/pull/11516 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-7109) KafkaConsumer should close its incremental fetch sessions on close
[ https://issues.apache.org/jira/browse/KAFKA-7109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-7109: - Fix Version/s: 3.3.1 > KafkaConsumer should close its incremental fetch sessions on close > -- > > Key: KAFKA-7109 > URL: https://issues.apache.org/jira/browse/KAFKA-7109 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Divij Vaidya >Priority: Minor > Labels: new-consumer-threading-should-fix > Fix For: 3.4.0, 3.3.1 > > > KafkaConsumer should close its incremental fetch sessions on close. > Currently, the sessions are not closed, but simply time out once the consumer > is gone. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14233) Jenkins build failed with task ':core:unitTest' exit value 1
Luke Chen created KAFKA-14233: - Summary: Jenkins build failed with task ':core:unitTest' exit value 1 Key: KAFKA-14233 URL: https://issues.apache.org/jira/browse/KAFKA-14233 Project: Kafka Issue Type: Test Reporter: Luke Chen Assignee: Luke Chen Failed messages look like this: {code:java} FAILURE: Build failed with an exception. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * What went wrong: [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with non-zero exit value 1 [2022-09-14T09:51:52.190Z] This problem might be caused by incorrect test process configuration. [2022-09-14T09:51:52.190Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Try: [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack trace. [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log output. [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14233) Jenkins build failed with task ':core:unitTest' exit value 1
[ https://issues.apache.org/jira/browse/KAFKA-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14233: -- Description: Recently, we got a lot of build failed (and terminated) with Failed messages look like this: {code:java} FAILURE: Build failed with an exception. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * What went wrong: [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with non-zero exit value 1 [2022-09-14T09:51:52.190Z] This problem might be caused by incorrect test process configuration. [2022-09-14T09:51:52.190Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Try: [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack trace. [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log output. [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code} was: Failed messages look like this: {code:java} FAILURE: Build failed with an exception. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * What went wrong: [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with non-zero exit value 1 [2022-09-14T09:51:52.190Z] This problem might be caused by incorrect test process configuration. [2022-09-14T09:51:52.190Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Try: [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack trace. [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log output. [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code} > Jenkins build failed with task ':core:unitTest' exit value 1 > > > Key: KAFKA-14233 > URL: https://issues.apache.org/jira/browse/KAFKA-14233 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Recently, we got a lot of build failed (and terminated) with Failed messages > look like this: > {code:java} > FAILURE: Build failed with an exception. > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] * What went wrong: > [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. > [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with > non-zero exit value 1 > [2022-09-14T09:51:52.190Z] This problem might be caused by incorrect test > process configuration. > [2022-09-14T09:51:52.190Z] Please refer to the test execution section in > the User Manual at > https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] * Try: > [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack > trace. > [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more > log output. > [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights. > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14233) Jenkins build failed with task ':core:unitTest' exit value 1
[ https://issues.apache.org/jira/browse/KAFKA-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14233: -- Description: Recently, we got a lot of build failed (and terminated) with core:unitTest failure. The failed messages look like this: {code:java} FAILURE: Build failed with an exception. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * What went wrong: [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with non-zero exit value 1 [2022-09-14T09:51:52.190Z] This problem might be caused by incorrect test process configuration. [2022-09-14T09:51:52.190Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Try: [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack trace. [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log output. [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code} was: Recently, we got a lot of build failed (and terminated) with Failed messages look like this: {code:java} FAILURE: Build failed with an exception. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * What went wrong: [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with non-zero exit value 1 [2022-09-14T09:51:52.190Z] This problem might be caused by incorrect test process configuration. [2022-09-14T09:51:52.190Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Try: [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack trace. [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log output. [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code} > Jenkins build failed with task ':core:unitTest' exit value 1 > > > Key: KAFKA-14233 > URL: https://issues.apache.org/jira/browse/KAFKA-14233 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Recently, we got a lot of build failed (and terminated) with core:unitTest > failure. The failed messages look like this: > {code:java} > FAILURE: Build failed with an exception. > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] * What went wrong: > [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. > [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with > non-zero exit value 1 > [2022-09-14T09:51:52.190Z] This problem might be caused by incorrect test > process configuration. > [2022-09-14T09:51:52.190Z] Please refer to the test execution section in > the User Manual at > https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] * Try: > [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack > trace. > [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more > log output. > [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights. > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org > [2022-09-14T09:51:52.190Z] > [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970792491 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); +long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs); +boolean emittedJoinWithSelf = false; +final Record selfRecord = record +.withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value())) +.withTimestamp(inputRecordTimestamp); +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + +// Join current record with other +try (final WindowStoreIterator iter = windowStore.fetch( +record.key(), timeFrom, timeTo)) { +while (iter.hasNext()) { +final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Join this with other +context().forward( +record
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); Review Comment: They are not the same: the first uses the `joinThisBeforeMs` and the second uses the `joinOtherBeforeMs`. This is needed as the inner join (for a reason I have not understood cc @vvcephei ) uses different intervals when fetching rows from the window store based on whether it is the left or right-hand side. Since we want the self-join to match the output of the inner-join, I followed the same logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #12639: KAFKA-14233: do not init managers twice to avoid resource leak
showuon opened a new pull request, #12639: URL: https://github.com/apache/kafka/pull/12639 Recently, we got a lot of build failed (and terminated) with core:unitTest failure. The failed messages look like this: ``` FAILURE: Build failed with an exception. [2022-09-14T09:51:52.190Z] [2022-09-14T09:51:52.190Z] * What went wrong: [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'. [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with non-zero exit value 1 ``` After investigation, I found one reason of it (maybe there are other reasons). In `BrokerMetadataPublisherTest#testReloadUpdatedFilesWithoutConfigChange` test, we created logManager twice, but when cleanup, we only close one of them. So, there will be a log cleaner keeping running. But during this time, the temp log dirs are deleted, so it will `Exit.halt(1)`, and got the error we saw in gradle, like this code did when we encounter IOException in all our log dirs: ``` fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", ")} have failed") Exit.halt(1) ``` Fixed it by disable `_firstPublish` flag for mock publisher, to avoid resource leak. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12639: KAFKA-14233: do not init managers twice to avoid resource leak
showuon commented on PR #12639: URL: https://github.com/apache/kafka/pull/12639#issuecomment-1246762358 @hachikuji @jsancio , please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970805661 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); +long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs); +boolean emittedJoinWithSelf = false; +final Record selfRecord = record +.withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value())) +.withTimestamp(inputRecordTimestamp); +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + +// Join current record with other +try (final WindowStoreIterator iter = windowStore.fetch( +record.key(), timeFrom, timeTo)) { +while (iter.hasNext()) { +final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Join this with other +context().forward( +record
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); Review Comment: They are not the same: the first uses the `joinThisBeforeMs` and the second uses the `joinOtherBeforeMs`. This is needed as the inner join uses different intervals when fetching rows from the window store based on whether it is the left or right-hand side. Since we want the self-join to match the output of the inner-join, I followed the same logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604716#comment-17604716 ] Chris Egerton commented on KAFKA-14220: --- [~kumarpritam863] this seemed like a legitimate issue; is there a reason it's been closed and the description has been cleared? > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest
C0urante commented on PR #11792: URL: https://github.com/apache/kafka/pull/11792#issuecomment-1246858578 Hi @dplavcic, are you still working on this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10802) Spurious log message when starting consumers
[ https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604757#comment-17604757 ] François Rosière commented on KAFKA-10802: -- Issue is still present when using Kafka 3.2.1. Possible regression? Anything to do to avoid such kind of issues? {code:java} [Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException){code} > Spurious log message when starting consumers > > > Key: KAFKA-10802 > URL: https://issues.apache.org/jira/browse/KAFKA-10802 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.6.0 >Reporter: Mickael Maison >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.7.0, 2.6.1, 2.8.0 > > > Reported by Gary Russell in the [2.6.1 RC3 vote > thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E] > I am seeing this on every consumer start: > 2020-11-25 13:54:34.858 INFO 42250 --- [ntainer#0-0-C-1] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed. > org.apache.kafka.common.errors.MemberIdRequiredException: The group member > needs to have a valid member id before actually entering a consumer group. > Due to this change > https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468 > I understand what a MemberIdRequiredException is, but the previous (2.6.0) > log (with exception.getMessage()) didn't stand out like the new one does > because it was all on one line. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-10802) Spurious log message when starting consumers
[ https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604757#comment-17604757 ] François Rosière edited comment on KAFKA-10802 at 9/14/22 2:33 PM: --- Issue is still present as an info log when using Kafka 3.2.1. Possible regression? Anything to do to avoid such kind of issues? {code:java} [Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException){code} was (Author: JIRAUSER288866): Issue is still present when using Kafka 3.2.1. Possible regression? Anything to do to avoid such kind of issues? {code:java} [Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException){code} > Spurious log message when starting consumers > > > Key: KAFKA-10802 > URL: https://issues.apache.org/jira/browse/KAFKA-10802 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.6.0 >Reporter: Mickael Maison >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.7.0, 2.6.1, 2.8.0 > > > Reported by Gary Russell in the [2.6.1 RC3 vote > thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E] > I am seeing this on every consumer start: > 2020-11-25 13:54:34.858 INFO 42250 --- [ntainer#0-0-C-1] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed. > org.apache.kafka.common.errors.MemberIdRequiredException: The group member > needs to have a valid member id before actually entering a consumer group. > Due to this change > https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468 > I understand what a MemberIdRequiredException is, but the previous (2.6.0) > log (with exception.getMessage()) didn't stand out like the new one does > because it was all on one line. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #12615: KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito
C0urante merged PR #12615: URL: https://github.com/apache/kafka/pull/12615 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10802) Spurious log message when starting consumers
[ https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604757#comment-17604757 ] François Rosière edited comment on KAFKA-10802 at 9/14/22 3:02 PM: --- Log is still present with info level when using Kafka 3.2.1. Possible regression? Anything to do to avoid such kind of issues? {code:java} [Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException){code} was (Author: JIRAUSER288866): Issue is still present as an info log when using Kafka 3.2.1. Possible regression? Anything to do to avoid such kind of issues? {code:java} [Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException){code} > Spurious log message when starting consumers > > > Key: KAFKA-10802 > URL: https://issues.apache.org/jira/browse/KAFKA-10802 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.6.0 >Reporter: Mickael Maison >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.7.0, 2.6.1, 2.8.0 > > > Reported by Gary Russell in the [2.6.1 RC3 vote > thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E] > I am seeing this on every consumer start: > 2020-11-25 13:54:34.858 INFO 42250 --- [ntainer#0-0-C-1] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed. > org.apache.kafka.common.errors.MemberIdRequiredException: The group member > needs to have a valid member id before actually entering a consumer group. > Due to this change > https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468 > I understand what a MemberIdRequiredException is, but the previous (2.6.0) > log (with exception.getMessage()) didn't stand out like the new one does > because it was all on one line. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604794#comment-17604794 ] Chris Egerton commented on KAFKA-14132: --- [~christo_lolov] Any preference on how we should indicate which tests on this list have been migrated? I've just merged [https://github.com/apache/kafka/pull/12615|https://github.com/apache/kafka/pull/12615,] from [~yash.mayya], which takes care of the {{{}ErrorHandlingTaskTest{}}}, {{{}WorkerTaskTest{}}}, {{{}ErrorReporterTest{}}}, {{{}RetryWithToleranceOperatorTest{}}}, and {{WorkerErrantRecordReporterTest}} test suites. > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # ErrorHandlingTaskTest (owner: Divij) > # SourceTaskOffsetCommiterTest (owner: Divij) > # WorkerMetricsGroupTest (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) > # WorkerSinkTaskThreadedTest (owner: Divij) > # WorkerTaskTest (owner: [~yash.mayya]) > # ErrorReporterTest (owner: [~yash.mayya]) > # RetryWithToleranceOperatorTest (owner: [~yash.mayya]) > # WorkerErrantRecordReporterTest (owner: [~yash.mayya]) > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest (owner: Christo) > ([https://github.com/apache/kafka/pull/12418]) > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) (owner: Christo) > # StateManagerUtilTest (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on pull request #12633: [Consumer Refactor] Background thread skeleton
philipnee commented on PR #12633: URL: https://github.com/apache/kafka/pull/12633#issuecomment-1246909476 cc @dajac -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
guozhangwang commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r970973727 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { +private void handleTasksWithStateUpdater(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseClean) { +handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate); +} + +private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate, +final Map> standbyTasksToCreate, +final Map> tasksToRecycle, +final Set tasksToCloseClean) { for (final Task task : tasks.allTasks()) { +if (!task.isActive()) { +throw new IllegalStateException("Standby tasks should only be managed by the state updater"); +} final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { -if (task.isActive()) { -final Set topicPartitions = activeTasksToCreate.get(taskId); -if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { -task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); -} -task.resume(); -} else { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} +handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { -if (!task.isActive()) { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} else { -tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); -} +tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } -private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { -classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +private void handleReAssignedActiveTask(final Task task, +final Set inputPartitions) { +if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +} +if (task.state() == State.SUSPENDED) { +task.resume(); +moveTaskFromTasksRegistryToStateUpdater(task); +} Review Comment: Yes, for within a single rebalance: with cooperative, the revocation and assignment happens at the same time, i.e. at the end of the rebalance, instead of revocation happening at the beginning and the assignment happens at the end, so for a revoked partition (hence task) we know it's definitely going to be reassigned for cooperative. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #12635: MINOR: Mention that kraft is production ready in upgrade notes
jsancio merged PR #12635: URL: https://github.com/apache/kafka/pull/12635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #12640: MINOR; Add missing li end tag
jsancio opened a new pull request, #12640: URL: https://github.com/apache/kafka/pull/12640 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
mumrah commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r970977191 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -397,18 +387,34 @@ private MatchingAclRule findAclRule( return matchingAclBuilder.build(); } +/** + * Use a binary search to find the index of the first ACL which is greater than or + * equal to the given ACL. This may be equal to the end of the array if there are + * no such ACLs. + */ +private int indexOfFirstAclGreaterThanOrEqualTo(StandardAcl exemplar) { +int i = Arrays.binarySearch(acls, +new StandardAclWithId(Uuid.ZERO_UUID, exemplar), +StandardAclWithId.ACL_COMPARATOR); +// Arrays.binarySearch returns a positive number if it found an exact match, and +// a negative number otherwise. Review Comment: Might comment about what the negative return value indicates. It helps L404 make more sense :) ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -109,14 +110,9 @@ public class StandardAuthorizerData { private final DefaultRule defaultRule; /** - * Contains all of the current ACLs sorted by (resource type, resource name). + * An immutable array of all the current ACLs sorted by (resource type, resource name). */ -private final ConcurrentSkipListSet aclsByResource; Review Comment: I see we are replacing the skip-list set with a sorted array. Don't we need to guard against duplicates in the array? If we used a TreeSet here, it would be closer to the current implementation and I think it should have linear time when copying from another TreeSet ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) { } @Override -public void addAcl(Uuid id, StandardAcl acl) { -data.addAcl(id, acl); -} - -@Override -public void removeAcl(Uuid id) { -data.removeAcl(id); +public synchronized void loadSnapshot(Map acls) { +data = data.copyWithAllNewAcls(acls.entrySet()); } @Override -public synchronized void loadSnapshot(Map acls) { -data = data.copyWithNewAcls(acls.entrySet()); +public synchronized void applyAclChanges( +Collection> newAcls, Review Comment: Could we just take a `Collection` since the acl has the ID as a property? ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -109,14 +110,9 @@ public class StandardAuthorizerData { private final DefaultRule defaultRule; /** - * Contains all of the current ACLs sorted by (resource type, resource name). + * An immutable array of all the current ACLs sorted by (resource type, resource name). */ -private final ConcurrentSkipListSet aclsByResource; - -/** - * Contains all of the current ACLs indexed by UUID. - */ -private final ConcurrentHashMap aclsById; Review Comment: Guess we don't need this anymore if we are removing `addAcl` and `removeAcl`? ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, -aclsByResource, -aclsById); +acls); } -StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { -StandardAuthorizerData newData = new StandardAuthorizerData( -log, -aclMutator, -loadingComplete, -superUsers, -defaultRule.result, -new ConcurrentSkipListSet<>(), -new ConcurrentHashMap<>()); -for (Entry entry : aclEntries) { -newData.addAcl(entry.getKey(), entry.getValue()); -} -log.info("Applied {} acl(s) from image.", aclEntries.size()); -return newData; +StandardAuthorizerData copyWithAllNewAcls( +Collection> newAclEntries +) { +return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } -void addAcl(Uuid id, StandardAcl acl) { -try { -StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); -if (prevAcl != null) { -throw new RuntimeException("An ACL with ID " + id + " already exists."); -} -if (!aclsByResource.add(acl)) { -aclsById.remove(id); -throw new RuntimeException("Unable to add the ACL with ID " + id + -" to aclsByResource"); -} -log.trace("Added ACL {}: {}", id, acl); -} catch (Throwable e) { -
[GitHub] [kafka] jsancio merged pull request #12640: MINOR; Add missing li end tag
jsancio merged PR #12640: URL: https://github.com/apache/kafka/pull/12640 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join
guozhangwang commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r971010379 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); Review Comment: I see. Is that the same as: 1) loop once with the larger value of (this window, other window); 2) for each record, check if it falls in both windows or not, if it falls in both windows, we emit it twice; otherwise we emit it once. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licens
[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…
jolshan commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r971022152 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof TimeoutException); } + runUntil(commitResult::isCompleted); // the commit shouldn't be completed without being sent since the produce request failed. assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed. -assertThrows(TimeoutException.class, commitResult::await); +assertThrows(KafkaException.class, commitResult::await); -assertTrue(transactionManager.hasAbortableError()); -assertTrue(transactionManager.hasOngoingTransaction()); +assertTrue(transactionManager.hasFatalBumpableError()); +assertFalse(transactionManager.hasOngoingTransaction()); assertFalse(transactionManager.isCompleting()); -assertTrue(transactionManager.transactionContainsPartition(tp0)); -TransactionalRequestResult abortResult = transactionManager.beginAbort(); - -prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch); -prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); -runUntil(abortResult::isCompleted); -assertTrue(abortResult.isSuccessful()); -assertFalse(transactionManager.hasOngoingTransaction()); -assertFalse(transactionManager.transactionContainsPartition(tp0)); +assertThrows(KafkaException.class, () -> transactionManager.beginAbort()); Review Comment: I was thinking for some longer term work we could potentially distinguish transactions by having perhaps having a bit of extra state server-side and by bumping the epoch after each transaction. But maybe this is too large of a change for now. I think you also came to the conclusion of an epoch bump but through a different path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r971047147 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(1, mockRandom.get()); // Produce large record, we should exceed "sticky" limit, but produce to this partition -// as we switch after the "sticky" limit is exceeded. The partition is switched after -// we produce. +// as we try to switch after the "sticky" limit is exceeded. The switch is disabled +// because of incomplete batch. byte[] largeValue = new byte[batchSize]; accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(partition1, partition.get()); -assertEquals(2, mockRandom.get()); +assertEquals(1, mockRandom.get()); -// Produce large record, we should switch to next partition. +// Produce large record, we switched to next partition by previous produce, but Review Comment: To be precise, the previous produce didn't switch to the next partition. The produce of this record forces the closing of the current batch, which cause the switch to the next partition. ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(1, mockRandom.get()); // Produce large record, we should exceed "sticky" limit, but produce to this partition -// as we switch after the "sticky" limit is exceeded. The partition is switched after -// we produce. +// as we try to switch after the "sticky" limit is exceeded. The switch is disabled +// because of incomplete batch. Review Comment: Thanks for the explanation, Artem. This makes sense to me now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604838#comment-17604838 ] Guozhang Wang commented on KAFKA-10635: --- Hmm okay I think we'd need to reproduce this which can help getting a better trace on the broker side. cc [~hachikuji] At the mean time, do you happen to still have the broker-side logs on the OOOSException thrown, if yes could you share in the comments? > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutdown (at restart) is clean or unclean. However, when we > rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling > restarts, we don't see this error on the streams application at all. This is > blocking us from upgrading our broker version. > -- This message was sent by Atlassian
[GitHub] [kafka] divijvaidya commented on pull request #12639: KAFKA-14233: do not init managers twice to avoid resource leak
divijvaidya commented on PR #12639: URL: https://github.com/apache/kafka/pull/12639#issuecomment-1247027848 Wow! Great deep dive to find the root cause here @showuon 👏 I am curious, how did you narrow down that `Gradle Test Executor 128` is related to `testReloadUpdatedFilesWithoutConfigChange`. Did you check the the logs and search for exception stack traces and managed to get the `IOException`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
guozhangwang commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r971076310 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -575,8 +584,10 @@ public void close() throws ProcessorStateException { void recycle() { log.debug("Recycling state for {} task {}.", taskType, taskId); -final List allChangelogs = getAllChangelogTopicPartitions(); -changelogReader.unregister(allChangelogs); +if (!stateUpdaterEnabled) { +final List allChangelogs = getAllChangelogTopicPartitions(); +changelogReader.unregister(allChangelogs); +} Review Comment: +100 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas opened a new pull request, #12641: KAFKA-14209 : Change Topology optimization to accept list of rules 1/3
vpapavas opened a new pull request, #12641: URL: https://github.com/apache/kafka/pull/12641 This PR is part of a series implementing the self-join rewriting. As part of it, we decided to clean up the `TOPOLOGY_OPTIMIZATION_CONFIG` and make it a list of optimization rules. Acceptable values are: NO_OPTIMIZATION, OPTIMIZE which applies all optimization rules or a comma separated list of specific optimizations. Added unit tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971091182 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) { } @Override -public void addAcl(Uuid id, StandardAcl acl) { -data.addAcl(id, acl); -} - -@Override -public void removeAcl(Uuid id) { -data.removeAcl(id); +public synchronized void loadSnapshot(Map acls) { +data = data.copyWithAllNewAcls(acls.entrySet()); } @Override -public synchronized void loadSnapshot(Map acls) { -data = data.copyWithNewAcls(acls.entrySet()); +public synchronized void applyAclChanges( +Collection> newAcls, Review Comment: StandardAcl doesn't contain an ID -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on PR #12636: URL: https://github.com/apache/kafka/pull/12636#issuecomment-1247070076 Jenkins went down again. :( ``` java.nio.file.FileSystemException: /home/jenkins/workspace/Kafka_kafka-pr_PR-12636: No space left on device ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971092861 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -109,14 +110,9 @@ public class StandardAuthorizerData { private final DefaultRule defaultRule; /** - * Contains all of the current ACLs sorted by (resource type, resource name). + * An immutable array of all the current ACLs sorted by (resource type, resource name). */ -private final ConcurrentSkipListSet aclsByResource; Review Comment: TreeSets are quite slow because they have poor memory locality. They also use a lot more memory. Ideally we could use something like a BTree, but Java doesn't have those, unfortunately... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971094152 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) { } @Override -public void addAcl(Uuid id, StandardAcl acl) { -data.addAcl(id, acl); -} - -@Override -public void removeAcl(Uuid id) { -data.removeAcl(id); +public synchronized void loadSnapshot(Map acls) { +data = data.copyWithAllNewAcls(acls.entrySet()); } @Override -public synchronized void loadSnapshot(Map acls) { -data = data.copyWithNewAcls(acls.entrySet()); +public synchronized void applyAclChanges( +Collection> newAcls, Review Comment: However, we could use StandardAclWithId here. That would require another copy in the case where we were loading a snapshot, so I'm not sure if it's worth it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-10635: - Attachment: logs.csv > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > Attachments: logs.csv > > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutdown (at restart) is clean or unclean. However, when we > rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling > restarts, we don't see this error on the streams application at all. This is > blocking us from upgrading our broker version. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604862#comment-17604862 ] Nicholas Telford commented on KAFKA-10635: -- Hi [~guozhang], I've managed to pull some logs from a recent occurrence of this issue. I specifically focused the logs on the partition and broker that produces the error, otherwise there would be thousands of irrelevant log messages. I've also replaced the name of the partitions in question with placeholder names ({{myapp}} and {{some-processor}}), to prevent leaking confidential information. We use a structured logging system, so I've converted the logs to CSV. I hope you find this format easy to understand. If you feel there's information missing that would help (e.g. logger name, a broader search on the logs, etc.) then let me know, and I'll see what I can do. See attached [^logs.csv] > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > Attachments: logs.csv > > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repar
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971095900 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, -aclsByResource, -aclsById); +acls); } -StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { -StandardAuthorizerData newData = new StandardAuthorizerData( -log, -aclMutator, -loadingComplete, -superUsers, -defaultRule.result, -new ConcurrentSkipListSet<>(), -new ConcurrentHashMap<>()); -for (Entry entry : aclEntries) { -newData.addAcl(entry.getKey(), entry.getValue()); -} -log.info("Applied {} acl(s) from image.", aclEntries.size()); -return newData; +StandardAuthorizerData copyWithAllNewAcls( +Collection> newAclEntries +) { +return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } -void addAcl(Uuid id, StandardAcl acl) { -try { -StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); -if (prevAcl != null) { -throw new RuntimeException("An ACL with ID " + id + " already exists."); -} -if (!aclsByResource.add(acl)) { -aclsById.remove(id); -throw new RuntimeException("Unable to add the ACL with ID " + id + -" to aclsByResource"); -} -log.trace("Added ACL {}: {}", id, acl); -} catch (Throwable e) { -log.error("addAcl error", e); -throw e; -} +StandardAuthorizerData copyWithAclChanges( +Collection> newAclEntries, +Set removedAclIds +) { +return copyWithNewAcls(acls, newAclEntries, removedAclIds); } -void removeAcl(Uuid id) { -try { -StandardAcl acl = aclsById.remove(id); -if (acl == null) { -throw new RuntimeException("ID " + id + " not found in aclsById."); +StandardAuthorizerData copyWithNewAcls( +StandardAclWithId[] existingAcls, +Collection> newAclEntries, +Set removedAclIds +) { +StandardAclWithId[] newAcls = new StandardAclWithId[ +existingAcls.length + newAclEntries.size() - removedAclIds.size()]; Review Comment: good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
artemlivshits commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r971097010 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(1, mockRandom.get()); // Produce large record, we should exceed "sticky" limit, but produce to this partition -// as we switch after the "sticky" limit is exceeded. The partition is switched after -// we produce. +// as we try to switch after the "sticky" limit is exceeded. The switch is disabled +// because of incomplete batch. byte[] largeValue = new byte[batchSize]; accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(partition1, partition.get()); -assertEquals(2, mockRandom.get()); +assertEquals(1, mockRandom.get()); -// Produce large record, we should switch to next partition. +// Produce large record, we switched to next partition by previous produce, but Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
akhileshchg commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971088491 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -109,14 +110,9 @@ public class StandardAuthorizerData { private final DefaultRule defaultRule; /** - * Contains all of the current ACLs sorted by (resource type, resource name). + * An immutable array of all the current ACLs sorted by (resource type, resource name). */ -private final ConcurrentSkipListSet aclsByResource; Review Comment: We're storing `Id` with the `StandardAcl`. Shouldn't that make it unique? I think since it is sorted, we can maybe have a conservative check to make sure there are no duplicate ids. ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) { } @Override -public void addAcl(Uuid id, StandardAcl acl) { -data.addAcl(id, acl); -} - -@Override -public void removeAcl(Uuid id) { -data.removeAcl(id); +public synchronized void loadSnapshot(Map acls) { +data = data.copyWithAllNewAcls(acls.entrySet()); } @Override -public synchronized void loadSnapshot(Map acls) { -data = data.copyWithNewAcls(acls.entrySet()); +public synchronized void applyAclChanges( +Collection> newAcls, Review Comment: I don't think `StandardAcl` has id with it. We have a different data structure for it`StandardAclWithId` ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -138,6 +139,15 @@ public List authorize( return results; } Review Comment: In `authorize` function we still do `StandardAuthorizerData curData = data;`. I don't think we need to do this anymore. ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, -aclsByResource, -aclsById); +acls); } -StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { -StandardAuthorizerData newData = new StandardAuthorizerData( -log, -aclMutator, -loadingComplete, -superUsers, -defaultRule.result, -new ConcurrentSkipListSet<>(), -new ConcurrentHashMap<>()); -for (Entry entry : aclEntries) { -newData.addAcl(entry.getKey(), entry.getValue()); -} -log.info("Applied {} acl(s) from image.", aclEntries.size()); -return newData; +StandardAuthorizerData copyWithAllNewAcls( +Collection> newAclEntries +) { +return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } -void addAcl(Uuid id, StandardAcl acl) { -try { -StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); -if (prevAcl != null) { -throw new RuntimeException("An ACL with ID " + id + " already exists."); -} -if (!aclsByResource.add(acl)) { -aclsById.remove(id); -throw new RuntimeException("Unable to add the ACL with ID " + id + -" to aclsByResource"); -} -log.trace("Added ACL {}: {}", id, acl); -} catch (Throwable e) { -log.error("addAcl error", e); -throw e; -} +StandardAuthorizerData copyWithAclChanges( +Collection> newAclEntries, +Set removedAclIds +) { +return copyWithNewAcls(acls, newAclEntries, removedAclIds); } -void removeAcl(Uuid id) { -try { -StandardAcl acl = aclsById.remove(id); -if (acl == null) { -throw new RuntimeException("ID " + id + " not found in aclsById."); +StandardAuthorizerData copyWithNewAcls( +StandardAclWithId[] existingAcls, +Collection> newAclEntries, +Set removedAclIds +) { +StandardAclWithId[] newAcls = new StandardAclWithId[ +existingAcls.length + newAclEntries.size() - removedAclIds.size()]; +int numRemoved = 0, j = 0; +for (int i = 0; i < existingAcls.length; i++) { +StandardAclWithId aclWithId = existingAcls[i]; +if (removedAclIds.contains(aclWithId.id())) { +numRemoved++; +} else { +newAcls[j++] = aclWithId; } -if (!aclsByResource.remove(acl)) { -throw new RuntimeException("Unable to remove the ACL with ID " + id + -" from aclsByResource"); +} +if (numRemoved < removedAclIds.size()
[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
mumrah commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971108745 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) { } @Override -public void addAcl(Uuid id, StandardAcl acl) { -data.addAcl(id, acl); -} - -@Override -public void removeAcl(Uuid id) { -data.removeAcl(id); +public synchronized void loadSnapshot(Map acls) { +data = data.copyWithAllNewAcls(acls.entrySet()); } @Override -public synchronized void loadSnapshot(Map acls) { -data = data.copyWithNewAcls(acls.entrySet()); +public synchronized void applyAclChanges( +Collection> newAcls, Review Comment: Oh, i must have been looking at StandardAclWithId :), this looks good as-is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
mumrah commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971116340 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, -aclsByResource, -aclsById); +acls); } -StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { -StandardAuthorizerData newData = new StandardAuthorizerData( -log, -aclMutator, -loadingComplete, -superUsers, -defaultRule.result, -new ConcurrentSkipListSet<>(), -new ConcurrentHashMap<>()); -for (Entry entry : aclEntries) { -newData.addAcl(entry.getKey(), entry.getValue()); -} -log.info("Applied {} acl(s) from image.", aclEntries.size()); -return newData; +StandardAuthorizerData copyWithAllNewAcls( +Collection> newAclEntries +) { +return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } -void addAcl(Uuid id, StandardAcl acl) { -try { -StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); -if (prevAcl != null) { -throw new RuntimeException("An ACL with ID " + id + " already exists."); -} -if (!aclsByResource.add(acl)) { -aclsById.remove(id); -throw new RuntimeException("Unable to add the ACL with ID " + id + -" to aclsByResource"); -} Review Comment: I think we lost this existing ID and duplicate ACL check in the new array code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool
nizhikov commented on PR #12632: URL: https://github.com/apache/kafka/pull/12632#issuecomment-1247109394 I've checked tests failure. Looks like they are not related to the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gitlw commented on pull request #12634: KAFKA-14225: Fix deadlock caused by lazy val exemptSensor
gitlw commented on PR #12634: URL: https://github.com/apache/kafka/pull/12634#issuecomment-1247113089 rerun tests please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #10156: KAFKA-10345: File watch store reloading
abbccdda closed pull request #10156: KAFKA-10345: File watch store reloading URL: https://github.com/apache/kafka/pull/10156 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #12642: KAFKA-14207; KRaft Operations documentation
jsancio opened a new pull request, #12642: URL: https://github.com/apache/kafka/pull/12642 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation
cmccabe commented on code in PR #12642: URL: https://github.com/apache/kafka/pull/12642#discussion_r971201651 ## docs/ops.html: ## @@ -3180,6 +3180,119 @@ 6.10 KRaft + + Configuration + + Process Roles + + In KRaft mode each Kafka server can be configured as a controller, as a broker or as both using the process.roles property. This property can have the following values: Review Comment: grammar: how about > as a controller, a broker, or both by using the process.roles property -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation
cmccabe commented on code in PR #12642: URL: https://github.com/apache/kafka/pull/12642#discussion_r971201992 ## docs/ops.html: ## @@ -3180,6 +3180,119 @@ 6.10 KRaft + + Configuration + + Process Roles + + In KRaft mode each Kafka server can be configured as a controller, as a broker or as both using the process.roles property. This property can have the following values: + + +If process.roles is set to broker, the server acts as a broker. +If process.roles is set to controller, the server acts as a controller. +If process.roles is set to broker,controller, the server acts as a broker and a controller. Review Comment: How about adding "both" ? > the server acts as *both* a broker and a controller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation
cmccabe commented on code in PR #12642: URL: https://github.com/apache/kafka/pull/12642#discussion_r971203144 ## docs/ops.html: ## @@ -3180,6 +3180,119 @@ 6.10 KRaft + + Configuration + + Process Roles + + In KRaft mode each Kafka server can be configured as a controller, as a broker or as both using the process.roles property. This property can have the following values: + + +If process.roles is set to broker, the server acts as a broker. +If process.roles is set to controller, the server acts as a controller. +If process.roles is set to broker,controller, the server acts as a broker and a controller. +If process.roles is not set at all, it is assumed to be in ZooKeeper mode. + + + Nodes that act as both brokers and controllers are referred to as "combined" nodes. Combined nodes are simpler to operate for simple use cases like a development environment. The key disadvantage is that the controller will be less isolated from the rest of the system. Combined mode is not recommended is critical deployment environments. Review Comment: Maybe add an example of how operations are harder like "it is not possible to roll the controllers separately from the brokers when in combined mode" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation
cmccabe commented on code in PR #12642: URL: https://github.com/apache/kafka/pull/12642#discussion_r971203656 ## docs/ops.html: ## @@ -3180,6 +3180,119 @@ 6.10 KRaft + + Configuration + + Process Roles + + In KRaft mode each Kafka server can be configured as a controller, as a broker or as both using the process.roles property. This property can have the following values: + + +If process.roles is set to broker, the server acts as a broker. +If process.roles is set to controller, the server acts as a controller. +If process.roles is set to broker,controller, the server acts as a broker and a controller. +If process.roles is not set at all, it is assumed to be in ZooKeeper mode. + + + Nodes that act as both brokers and controllers are referred to as "combined" nodes. Combined nodes are simpler to operate for simple use cases like a development environment. The key disadvantage is that the controller will be less isolated from the rest of the system. Combined mode is not recommended is critical deployment environments. + + + Controllers + + In KRaft mode, only a small group of specially selected servers can act as controllers (unlike the ZooKeeper-based mode, where any server can become the Controller). The specially selected controller servers will participate in the metadata quorum. Each controller server is either active, or a hot standby for the current active controller server. + + A Kafka cluster will typically select 3 or 5 servers for this role, depending on factors like cost and the number of concurrent failures your system should withstand without availability impact. A majority of the controllers must be alive in order to maintain availability. With 3 controllers, the cluster can tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 controller failures. Review Comment: How about > A Kafka admin will typically select... the Kafka cluster hasn't achieved sentience YET. :) ## docs/ops.html: ## @@ -3180,6 +3180,119 @@ 6.10 KRaft + + Configuration + + Process Roles + + In KRaft mode each Kafka server can be configured as a controller, as a broker or as both using the process.roles property. This property can have the following values: + + +If process.roles is set to broker, the server acts as a broker. +If process.roles is set to controller, the server acts as a controller. +If process.roles is set to broker,controller, the server acts as a broker and a controller. +If process.roles is not set at all, it is assumed to be in ZooKeeper mode. + + + Nodes that act as both brokers and controllers are referred to as "combined" nodes. Combined nodes are simpler to operate for simple use cases like a development environment. The key disadvantage is that the controller will be less isolated from the rest of the system. Combined mode is not recommended is critical deployment environments. + + + Controllers + + In KRaft mode, only a small group of specially selected servers can act as controllers (unlike the ZooKeeper-based mode, where any server can become the Controller). The specially selected controller servers will participate in the metadata quorum. Each controller server is either active, or a hot standby for the current active controller server. + + A Kafka cluster will typically select 3 or 5 servers for this role, depending on factors like cost and the number of concurrent failures your system should withstand without availability impact. A majority of the controllers must be alive in order to maintain availability. With 3 controllers, the cluster can tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 controller failures. Review Comment: How about > A Kafka admin will typically select... the Kafka cluster hasn't achieved sentience YET. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation
cmccabe commented on code in PR #12642: URL: https://github.com/apache/kafka/pull/12642#discussion_r971204480 ## docs/ops.html: ## @@ -3180,6 +3180,119 @@ 6.10 KRaft + + Configuration + + Process Roles + + In KRaft mode each Kafka server can be configured as a controller, as a broker or as both using the process.roles property. This property can have the following values: + + +If process.roles is set to broker, the server acts as a broker. +If process.roles is set to controller, the server acts as a controller. +If process.roles is set to broker,controller, the server acts as a broker and a controller. +If process.roles is not set at all, it is assumed to be in ZooKeeper mode. + + + Nodes that act as both brokers and controllers are referred to as "combined" nodes. Combined nodes are simpler to operate for simple use cases like a development environment. The key disadvantage is that the controller will be less isolated from the rest of the system. Combined mode is not recommended is critical deployment environments. + + + Controllers + + In KRaft mode, only a small group of specially selected servers can act as controllers (unlike the ZooKeeper-based mode, where any server can become the Controller). The specially selected controller servers will participate in the metadata quorum. Each controller server is either active, or a hot standby for the current active controller server. + + A Kafka cluster will typically select 3 or 5 servers for this role, depending on factors like cost and the number of concurrent failures your system should withstand without availability impact. A majority of the controllers must be alive in order to maintain availability. With 3 controllers, the cluster can tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 controller failures. + + All of the servers in a Kafka cluster discover the quorum voters using the controller.quorum.voters property. This identifies the quorum controller servers that should be used. All the controllers must be enumerated. Each controller is identified with their id, host and port information. This is an example configuration: controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3 Review Comment: can we put the example configuration in a PRE or CODE block, or whatever, so it shows as monospace? (Just a thought) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation
cmccabe commented on code in PR #12642: URL: https://github.com/apache/kafka/pull/12642#discussion_r971205378 ## docs/ops.html: ## @@ -3180,6 +3180,119 @@ 6.10 KRaft + + Configuration + + Process Roles + + In KRaft mode each Kafka server can be configured as a controller, as a broker or as both using the process.roles property. This property can have the following values: + + +If process.roles is set to broker, the server acts as a broker. +If process.roles is set to controller, the server acts as a controller. +If process.roles is set to broker,controller, the server acts as a broker and a controller. +If process.roles is not set at all, it is assumed to be in ZooKeeper mode. + + + Nodes that act as both brokers and controllers are referred to as "combined" nodes. Combined nodes are simpler to operate for simple use cases like a development environment. The key disadvantage is that the controller will be less isolated from the rest of the system. Combined mode is not recommended is critical deployment environments. + + + Controllers + + In KRaft mode, only a small group of specially selected servers can act as controllers (unlike the ZooKeeper-based mode, where any server can become the Controller). The specially selected controller servers will participate in the metadata quorum. Each controller server is either active, or a hot standby for the current active controller server. + + A Kafka cluster will typically select 3 or 5 servers for this role, depending on factors like cost and the number of concurrent failures your system should withstand without availability impact. A majority of the controllers must be alive in order to maintain availability. With 3 controllers, the cluster can tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 controller failures. + + All of the servers in a Kafka cluster discover the quorum voters using the controller.quorum.voters property. This identifies the quorum controller servers that should be used. All the controllers must be enumerated. Each controller is identified with their id, host and port information. This is an example configuration: controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3 + + If the Kafka cluster has 3 controllers named controller1, controller2 and controller3 then controller3 may have the following: + + +process.roles=controller +node.id=1 +listeners=CONTROLLER://controller1.example.com:9093 +controller.quorum.voters=1...@controller1.example.com:9093,2...@controller2.example.com:9093,3...@controller3.example.com:9093 + + Every broker and controller must set the controller.quorum.voters property. The node ID supplied in the controller.quorum.voters property must match the corresponding id on the controller servers. For example, on controller1, node.id must be set to 1, and so forth. Each node ID must be unique across all the nodes in a particular cluster. No two nodes can have the same node ID regardless of their process.roles values. + + Storage Tool + + The kafka-storage.sh random-uuid command can be used to generate a cluster ID for your new cluster. This cluster ID must be used when formatting each node in the cluster with the kafka-storage.sh format command. + + This is different from how Kafka has operated in the past. Previously, Kafka would format blank storage directories automatically, and also generate a new cluster ID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. This is particularly important for the metadata log maintained by the controller and broker servers. If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with missing committed data. + + Debugging + + Metadata Quorum Tool + + The kafka-metadata-quorum tool can be used to describe the runtime state of the cluster metadata partition. For example, the following command display a summary of the metadata quorum: + +> bin/kafka-metadata-quorum.sh --bootstrap-server broker_host:port describe --status +ClusterId: fMCL8kv1SWm87L_Md-I2hg +LeaderId: 3002 +LeaderEpoch:2 +HighWatermark: 10 +MaxFollowerLag: 0 +MaxFollowerLagTimeMs: -1 +CurrentVoters: [3000,3001,3002] +CurrentObservers: [0,1,2] + + Dump Log Tool + + The kafka-dump-log tool can be used to debug the log segments and snapshots for the cluster metadata directory. The tool will scan the provided files and decode the metadata records. For example, this command decodes and prints the records in the first log segment: + +> bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadat --files metadata_log_dir/__cluster_metadata-0/.log Review Comment: can we leave off `--skip-record-metadata`? I recall it making the output a bit weird. also
[GitHub] [kafka] jolshan opened a new pull request, #12643: KAFKA-14097: make producer ID expiration a dynamic config
jolshan opened a new pull request, #12643: URL: https://github.com/apache/kafka/pull/12643 Changed the integer value for producer ID expiration to a dynamically configurable value. Tested that the configuration can be changed dynamically and works as expected. I considered adding the producer expiration check interval into the config, but since it is not dynamic (and is harder to make it dynamic), I decided to keep it separate. A followup could be to incorporate all the producer state manager configs into this new object. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation
[ https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604951#comment-17604951 ] Idrissi edited comment on KAFKA-6221 at 9/14/22 8:18 PM: - Hello , i am facing the same problem with the following message : * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .}} {{Many topics are created with 6 partitions and replication factor 3. The problems seems to be triggered by the Replica Fetcher thread during the synchronizing with the leader . Do you think its the same problem as mentionned in this JIRA ticket ?}}{{ }} was (Author: JIRAUSER295755): Hello , i am facing the same problem with the following message : * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .Many topics are created with 6 partitions and replication factor 3. The problems seems to be triggered by the Replica Fetcher thread during the synchronizing with the leader . Do you think its the same problem as mentionned in this JIRA ticket ?}}{{ }} > ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic > creation > --- > > Key: KAFKA-6221 > URL: https://issues.apache.org/jira/browse/KAFKA-6221 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0 > Environment: RHEL 7 >Reporter: Alex Dunayevsky >Priority: Minor > Original Estimate: 336h > Remaining Estimate: 336h > > This issue appeared to happen frequently on 0.10.2.0. > On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. > We'll focus on reproducing it on 0.10.2.1 and 1.0.0. > *TOPOLOGY:* > 3 brokers, 1 zk. > *REPRODUCING STRATEGY:* > Create a few dozens topics (say, 40) one by one, each with replication factor > 2. Number of partitions, generally, does not matter but, for easier > reproduction, should not be too small (around 30 or so). > *CREATE 40 TOPICS:* > {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic > "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; > done {code} > *ERRORS* > {code:java} > *BROKER 1* > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionExcep
[jira] [Comment Edited] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation
[ https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604951#comment-17604951 ] Idrissi edited comment on KAFKA-6221 at 9/14/22 8:18 PM: - Hello , i am facing the same problem with the following message : * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .}} {{Many topics are created with 6 partitions and replication factor 3. The problems seems to be triggered by the Replica Fetcher thread during the synchronization process with the leader . Do you think its the same problem as mentioned in this JIRA ticket ?}}{{ }} was (Author: JIRAUSER295755): Hello , i am facing the same problem with the following message : * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .}} {{Many topics are created with 6 partitions and replication factor 3. The problems seems to be triggered by the Replica Fetcher thread during the synchronizing with the leader . Do you think its the same problem as mentionned in this JIRA ticket ?}}{{ }} > ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic > creation > --- > > Key: KAFKA-6221 > URL: https://issues.apache.org/jira/browse/KAFKA-6221 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0 > Environment: RHEL 7 >Reporter: Alex Dunayevsky >Priority: Minor > Original Estimate: 336h > Remaining Estimate: 336h > > This issue appeared to happen frequently on 0.10.2.0. > On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. > We'll focus on reproducing it on 0.10.2.1 and 1.0.0. > *TOPOLOGY:* > 3 brokers, 1 zk. > *REPRODUCING STRATEGY:* > Create a few dozens topics (say, 40) one by one, each with replication factor > 2. Number of partitions, generally, does not matter but, for easier > reproduction, should not be too small (around 30 or so). > *CREATE 40 TOPICS:* > {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic > "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; > done {code} > *ERRORS* > {code:java} > *BROKER 1* > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopi
[jira] [Commented] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation
[ https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604951#comment-17604951 ] Idrissi commented on KAFKA-6221: Hello , i am facing the same problem with the following message : * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .Many topics are created with 6 partitions and replication factor 3. The problems seems to be triggered by the Replica Fetcher thread during the synchronizing with the leader . Do you think its the same problem as mentionned in this JIRA ticket ?}}{{ }} > ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic > creation > --- > > Key: KAFKA-6221 > URL: https://issues.apache.org/jira/browse/KAFKA-6221 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0 > Environment: RHEL 7 >Reporter: Alex Dunayevsky >Priority: Minor > Original Estimate: 336h > Remaining Estimate: 336h > > This issue appeared to happen frequently on 0.10.2.0. > On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. > We'll focus on reproducing it on 0.10.2.1 and 1.0.0. > *TOPOLOGY:* > 3 brokers, 1 zk. > *REPRODUCING STRATEGY:* > Create a few dozens topics (say, 40) one by one, each with replication factor > 2. Number of partitions, generally, does not matter but, for easier > reproduction, should not be too small (around 30 or so). > *CREATE 40 TOPICS:* > {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic > "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; > done {code} > *ERRORS* > {code:java} > *BROKER 1* > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > *BROKER 2* > [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for > partition [topic20_p28_r2,12] to broker > 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for > partition [topic20_p28_r2,12] to br
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971279173 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -138,6 +139,15 @@ public List authorize( return results; } Review Comment: The purpose of doing this is to avoid loading the volatile multiple times. Each time we load a volatile, it is expensive because it requires an interlocked instruction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971280346 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, -aclsByResource, -aclsById); +acls); } -StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { -StandardAuthorizerData newData = new StandardAuthorizerData( -log, -aclMutator, -loadingComplete, -superUsers, -defaultRule.result, -new ConcurrentSkipListSet<>(), -new ConcurrentHashMap<>()); -for (Entry entry : aclEntries) { -newData.addAcl(entry.getKey(), entry.getValue()); -} -log.info("Applied {} acl(s) from image.", aclEntries.size()); -return newData; +StandardAuthorizerData copyWithAllNewAcls( +Collection> newAclEntries +) { +return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } -void addAcl(Uuid id, StandardAcl acl) { -try { -StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); -if (prevAcl != null) { -throw new RuntimeException("An ACL with ID " + id + " already exists."); -} -if (!aclsByResource.add(acl)) { -aclsById.remove(id); -throw new RuntimeException("Unable to add the ACL with ID " + id + -" to aclsByResource"); -} -log.trace("Added ACL {}: {}", id, acl); -} catch (Throwable e) { -log.error("addAcl error", e); -throw e; -} +StandardAuthorizerData copyWithAclChanges( +Collection> newAclEntries, +Set removedAclIds +) { +return copyWithNewAcls(acls, newAclEntries, removedAclIds); } -void removeAcl(Uuid id) { -try { -StandardAcl acl = aclsById.remove(id); -if (acl == null) { -throw new RuntimeException("ID " + id + " not found in aclsById."); +StandardAuthorizerData copyWithNewAcls( +StandardAclWithId[] existingAcls, +Collection> newAclEntries, +Set removedAclIds +) { +StandardAclWithId[] newAcls = new StandardAclWithId[ +existingAcls.length + newAclEntries.size() - removedAclIds.size()]; +int numRemoved = 0, j = 0; +for (int i = 0; i < existingAcls.length; i++) { +StandardAclWithId aclWithId = existingAcls[i]; +if (removedAclIds.contains(aclWithId.id())) { +numRemoved++; +} else { +newAcls[j++] = aclWithId; } -if (!aclsByResource.remove(acl)) { -throw new RuntimeException("Unable to remove the ACL with ID " + id + -" from aclsByResource"); +} +if (numRemoved < removedAclIds.size()) { +throw new RuntimeException("Only located " + numRemoved + " out of " + +removedAclIds.size() + " removed ACL ID(s). removedAclIds = " + +removedAclIds.stream().map(a -> a.toString()).collect(Collectors.joining(", "))); +} +if (!newAclEntries.isEmpty()) { +int i = 0; +for (Entry entry : newAclEntries) { +newAcls[existingAcls.length + i] = new StandardAclWithId(entry.getKey(), entry.getValue()); +i++; } Review Comment: Duplicate IDs should not happen unless there is a bug. I do wish we could check for it here, but it would be very inefficient to do so, since we'd have to scan the whole array. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971280735 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, -aclsByResource, -aclsById); +acls); } -StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { -StandardAuthorizerData newData = new StandardAuthorizerData( -log, -aclMutator, -loadingComplete, -superUsers, -defaultRule.result, -new ConcurrentSkipListSet<>(), -new ConcurrentHashMap<>()); -for (Entry entry : aclEntries) { -newData.addAcl(entry.getKey(), entry.getValue()); -} -log.info("Applied {} acl(s) from image.", aclEntries.size()); -return newData; +StandardAuthorizerData copyWithAllNewAcls( +Collection> newAclEntries +) { +return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } -void addAcl(Uuid id, StandardAcl acl) { -try { -StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); -if (prevAcl != null) { -throw new RuntimeException("An ACL with ID " + id + " already exists."); -} -if (!aclsByResource.add(acl)) { -aclsById.remove(id); -throw new RuntimeException("Unable to add the ACL with ID " + id + -" to aclsByResource"); -} Review Comment: Like I said above, we'd have to scan the whole array since we don't have a map from id -> acl any more. I don't think it's worth it just for a sanity check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
artemlivshits commented on PR #12570: URL: https://github.com/apache/kafka/pull/12570#issuecomment-1247321505 Looked through the failed tests -- seem unrelated (also ran locally - pass). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
mumrah commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971306933 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -138,6 +139,15 @@ public List authorize( return results; } +@Override +public AuthorizationResult authorizeByResourceType( Review Comment: If I understand correctly, this implementation was added to take advantage of the new binary search approach in the ACL array. IOW, an optimization over the default `authorizeByResourceType` impl? ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java: ## @@ -76,14 +78,15 @@ public interface ClusterMetadataAuthorizer extends Authorizer { void loadSnapshot(Map acls); /** - * Add a new ACL. Any ACL with the same ID will be replaced. - */ -void addAcl(Uuid id, StandardAcl acl); - -/** - * Remove the ACL with the given ID. + * Add or remove ACLs. + * + * @param newAcls The ACLs to add. + * @param removedAclIds The ACL IDs to remove. */ -void removeAcl(Uuid id); +void applyAclChanges( Review Comment: We should document that this method does not expect duplicates or allow replacing ACL by ID. ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, -aclsByResource, -aclsById); +acls); } -StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) { -StandardAuthorizerData newData = new StandardAuthorizerData( -log, -aclMutator, -loadingComplete, -superUsers, -defaultRule.result, -new ConcurrentSkipListSet<>(), -new ConcurrentHashMap<>()); -for (Entry entry : aclEntries) { -newData.addAcl(entry.getKey(), entry.getValue()); -} -log.info("Applied {} acl(s) from image.", aclEntries.size()); -return newData; +StandardAuthorizerData copyWithAllNewAcls( +Collection> newAclEntries +) { +return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } -void addAcl(Uuid id, StandardAcl acl) { -try { -StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); -if (prevAcl != null) { -throw new RuntimeException("An ACL with ID " + id + " already exists."); -} -if (!aclsByResource.add(acl)) { -aclsById.remove(id); -throw new RuntimeException("Unable to add the ACL with ID " + id + -" to aclsByResource"); -} -log.trace("Added ACL {}: {}", id, acl); -} catch (Throwable e) { -log.error("addAcl error", e); -throw e; -} +StandardAuthorizerData copyWithAclChanges( +Collection> newAclEntries, +Set removedAclIds +) { +return copyWithNewAcls(acls, newAclEntries, removedAclIds); } -void removeAcl(Uuid id) { -try { -StandardAcl acl = aclsById.remove(id); -if (acl == null) { -throw new RuntimeException("ID " + id + " not found in aclsById."); +StandardAuthorizerData copyWithNewAcls( +StandardAclWithId[] existingAcls, +Collection> newAclEntries, +Set removedAclIds +) { +int newSize = existingAcls.length + newAclEntries.size() - removedAclIds.size(); +StandardAclWithId[] newAcls = new StandardAclWithId[newSize]; +int numRemoved = 0, j = 0; +for (int i = 0; i < existingAcls.length; i++) { +StandardAclWithId aclWithId = existingAcls[i]; +if (removedAclIds.contains(aclWithId.id())) { +numRemoved++; +} else { +newAcls[j++] = aclWithId; } -if (!aclsByResource.remove(acl)) { -throw new RuntimeException("Unable to remove the ACL with ID " + id + -" from aclsByResource"); +} +if (numRemoved < removedAclIds.size()) { +throw new RuntimeException("Only located " + numRemoved + " out of " + +removedAclIds.size() + " removed ACL ID(s). removedAclIds = " + +removedAclIds.stream().map(a -> a.toString()).collect(Collectors.joining(", "))); +} +if (!newAclEntries.isEmpty()) { +int i = 0; +for (Entry entry : newAclEntries) { +newAcls[existingAcls.length + i] = new StandardAclWithId(entry.getKey(), entry.getValue()); Review Comment: Should this index be offset by the number we removed? ## metada
[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971319874 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -138,6 +139,15 @@ public List authorize( return results; } +@Override +public AuthorizationResult authorizeByResourceType( Review Comment: mainly I added this because the benchmarks looked really bad without it 😬 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a diff in pull request #12638: Register and unregister changelog topics in state updater
lihaosky commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r971383169 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java: ## @@ -31,6 +33,8 @@ public interface ChangelogRegister { */ void register(final TopicPartition partition, final ProcessorStateManager stateManager); +void register(final Set partition, final ProcessorStateManager stateManager); Review Comment: nit: partition -> partitions? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -444,6 +451,8 @@ public void restore(final Map tasks) { final Set corruptedTasks = new HashSet<>(); e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId())); throw new TaskCorruptedException(corruptedTasks, e); +} catch (final InterruptException interruptException) { +throw interruptException; Review Comment: QQ: will `InterruptException` be thrown even without this? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -347,6 +348,12 @@ public void register(final TopicPartition partition, final ProcessorStateManager } } +public void register(final Set changelogPartitions, final ProcessorStateManager stateManager) { Review Comment: nit: `@Override` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ## @@ -209,7 +212,9 @@ void registerStateStores(final List allStores, final InternalProcess processorContext.uninitialize(); for (final StateStore store : allStores) { if (stores.containsKey(store.name())) { -maybeRegisterStoreWithChangelogReader(store.name()); +if (!stateUpdaterEnabled) { +maybeRegisterStoreWithChangelogReader(store.name()); Review Comment: QQ: this method is called `registerStateStores` but why does it expect `store` to be already in `stores`? The only place I can find `stores.put` is called is in `registerStore` and in that method, `maybeRegisterStoreWithChangelogReader` is called immediately after `stores.put` is called. So I'm confused what's the real purpose of this method and if `maybeRegisterStoreWithChangelogReader` call here is redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14234) /admin/delete_topics is not in the list of zookeeper watchers
Yan Xue created KAFKA-14234: --- Summary: /admin/delete_topics is not in the list of zookeeper watchers Key: KAFKA-14234 URL: https://issues.apache.org/jira/browse/KAFKA-14234 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 3.2.1 Reporter: Yan Xue I deployed the Kafka cluster on Kuberentes and am trying to figure out how topic deletion works. I know Kafka controller has the topic deletion manager which watches the node change in the zookeeper. Whenever a topic is deleted, the manager is triggered. I expected to see that the {{/admin/delete_topics}} is in the watcher list. However, I didn't find it. Sample output: root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181 0x20010021139 /admin/preferred_replica_election /brokers/ids/0 /brokers/ids/1 /brokers/ids/2 /brokers/topics/__consumer_offsets /brokers/ids/3 /brokers/ids/4 /controller /admin/reassign_partitions /brokers/topics/test-test /feature 0x200100211390001 /controller /feature 0x1631f9 /controller /feature -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14234) /admin/delete_topics is not in the list of zookeeper watchers
[ https://issues.apache.org/jira/browse/KAFKA-14234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yan Xue updated KAFKA-14234: Description: I deployed the Kafka cluster on Kuberentes and am trying to figure out how topic deletion works. I know Kafka controller has the topic deletion manager which watches the node change in the zookeeper. Whenever a topic is deleted, the manager is triggered. I expected to see that the {{/admin/delete_topics}} is in the watcher list. However, I didn't find it. Sample output: root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181 0x20010021139 /admin/preferred_replica_election /brokers/ids/0 /brokers/ids/1 /brokers/ids/2 /brokers/topics/__consumer_offsets /brokers/ids/3 /brokers/ids/4 /controller /admin/reassign_partitions /brokers/topics/test-test /feature 0x200100211390001 /controller /feature 0x1631f9 /controller /feature Even though I can delete the topic, I am confused about the output. was: I deployed the Kafka cluster on Kuberentes and am trying to figure out how topic deletion works. I know Kafka controller has the topic deletion manager which watches the node change in the zookeeper. Whenever a topic is deleted, the manager is triggered. I expected to see that the {{/admin/delete_topics}} is in the watcher list. However, I didn't find it. Sample output: root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181 0x20010021139 /admin/preferred_replica_election /brokers/ids/0 /brokers/ids/1 /brokers/ids/2 /brokers/topics/__consumer_offsets /brokers/ids/3 /brokers/ids/4 /controller /admin/reassign_partitions /brokers/topics/test-test /feature 0x200100211390001 /controller /feature 0x1631f9 /controller /feature > /admin/delete_topics is not in the list of zookeeper watchers > - > > Key: KAFKA-14234 > URL: https://issues.apache.org/jira/browse/KAFKA-14234 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.2.1 >Reporter: Yan Xue >Priority: Minor > > I deployed the Kafka cluster on Kuberentes and am trying to figure out how > topic deletion works. I know Kafka controller has the topic deletion manager > which watches the node change in the zookeeper. Whenever a topic is deleted, > the manager is triggered. I expected to see that the {{/admin/delete_topics}} > is in the watcher list. However, I didn't find it. Sample output: > root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181 > 0x20010021139 > /admin/preferred_replica_election > /brokers/ids/0 > /brokers/ids/1 > /brokers/ids/2 > /brokers/topics/__consumer_offsets > /brokers/ids/3 > /brokers/ids/4 > /controller > /admin/reassign_partitions > /brokers/topics/test-test > /feature > 0x200100211390001 > /controller > /feature > 0x1631f9 > /controller > /feature > > Even though I can delete the topic, I am confused about the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao merged pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao merged PR #12570: URL: https://github.com/apache/kafka/pull/12570 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14156) Built-in partitioner may create suboptimal batches with large linger.ms
[ https://issues.apache.org/jira/browse/KAFKA-14156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-14156. - Assignee: Artem Livshits Resolution: Fixed Merged the PR to 3.3 and trunk. > Built-in partitioner may create suboptimal batches with large linger.ms > --- > > Key: KAFKA-14156 > URL: https://issues.apache.org/jira/browse/KAFKA-14156 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.3.0 >Reporter: Artem Livshits >Assignee: Artem Livshits >Priority: Blocker > Fix For: 3.3.0 > > > The new built-in "sticky" partitioner switches partitions based on the amount > of bytes produced to a partition. It doesn't use batch creation as a switch > trigger. The previous "sticky" DefaultPartitioner switched partition when a > new batch was created and with small linger.ms (default is 0) could result in > sending larger batches to slower brokers potentially overloading them. See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > for more detail. > However, the with large linger.ms, the new built-in partitioner may create > suboptimal batches. Let's consider an example, suppose linger.ms=500, > batch.size=16KB (default) and we produce 24KB / sec, i.e. every 500ms we > produce 12KB worth of data. The new built-in partitioner would switch > partition on every 16KB, so we could get into the following batching pattern: > * produce 12KB to one partition in 500ms, hit linger, send 12KB batch > * produce 4KB more to the same partition, now we've produced 16KB of data, > switch partition > * produce 12KB to the second partition in 500ms, hit linger, send 12KB batch > * in the mean time the 4KB produced to the first partition would hit linger > as well, sending 4KB batch > * produce 4KB more to the second partition, now we've produced 16KB of data > to the second partition, switch to 3rd partition > so in this scenario the new built-in partitioner produces a mix of 12KB and > 4KB batches, while the previous DefaultPartitioner would produce only 12KB > batches -- it switches on new batch creation, so there is no "mid-linger" > leftover batches. > To avoid creation of batch fragmentation on partition switch, we can wait > until the batch is ready before switching the partition, i.e. the condition > to switch to a new partition would be "produced batch.size bytes" AND "batch > is not lingering". This may potentially introduce some non-uniformity into > data distribution, but unlike the previous DefaultPartitioner, the > non-uniformity would not be based on broker performance and won't > re-introduce the bad pattern of sending more data to slower brokers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14132: -- Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#FF8B00}InReview{color} {color:#00875A}Merged{color} # ErrorHandlingTaskTest (owner: Divij) # SourceTaskOffsetCommiterTest (owner: Divij) # WorkerMetricsGroupTest (owner: Divij) # WorkerSinkTaskTest (owner: Divij) # WorkerSinkTaskThreadedTest (owner: Divij) # {color:#00875A}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875A}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875A}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875A}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) (owner: Christo) # StateManagerUtilTest (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # ErrorHandlingTaskTest (owner: Divij) # SourceTaskOffsetCommiterTest (owner: Divij) # WorkerMetricsGroupTest (owner: Divij) # WorkerSinkTaskTest (owner: Divij) # WorkerSinkTaskThreadedTest (owner: Divij) # WorkerTaskTest (owner: [~yash.mayya]) # ErrorReporterTest (owner: [~yash.mayya]) # RetryWithToleranceOperatorTest (owner: [~yash.mayya]) # WorkerErrantRecordReporterTest (owner: [~yash.mayya]) # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) (owner: Christo) # StateManagerUtilTest (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#FF8B00}InReview{color} > {color:#00875A}Merged{color} > # ErrorHandlingTaskTest (owner: Divij) > # SourceTaskOffsetCommiterTest (owner: Divij) > # WorkerMetricsGroupTest (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) > # WorkerSinkTaskThreadedTest (owner: Divij) > # {color:#00875A}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875A}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875A}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875A}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest (owner: Christo) > ([https://github.com/apache/kafka/pull/12418]) > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) (owner: Christo) > # StateManagerUtilTest (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605087#comment-17605087 ] Christo Lolov commented on KAFKA-14132: --- Hello [~ChrisEgerton]! I have followed the colour-coding I have been using in https://issues.apache.org/jira/browse/KAFKA-14133. Thank you for the reviews. > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#FF8B00}InReview{color} > {color:#00875A}Merged{color} > # ErrorHandlingTaskTest (owner: Divij) > # SourceTaskOffsetCommiterTest (owner: Divij) > # WorkerMetricsGroupTest (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) > # WorkerSinkTaskThreadedTest (owner: Divij) > # {color:#00875A}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875A}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875A}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875A}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest (owner: Christo) > ([https://github.com/apache/kafka/pull/12418]) > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) (owner: Christo) > # StateManagerUtilTest (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)