[jira] [Updated] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource
[ https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10147: --- Fix Version/s: (was: 2.7.0) 2.6.0 > MockAdminClient#describeConfigs(Collection) is unable to > handle broker resource > --- > > Key: KAFKA-10147 > URL: https://issues.apache.org/jira/browse/KAFKA-10147 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 2.6.0 > > > MockAdminClient#describeConfigs(Collection) has new > implementation introduced by > https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06. > It does not handle broker resource so > ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource
[ https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10147: --- Fix Version/s: 2.7.0 > MockAdminClient#describeConfigs(Collection) is unable to > handle broker resource > --- > > Key: KAFKA-10147 > URL: https://issues.apache.org/jira/browse/KAFKA-10147 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 2.7.0 > > > MockAdminClient#describeConfigs(Collection) has new > implementation introduced by > https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06. > It does not handle broker resource so > ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #8839: KIP-585: Documentation
kkonstantine commented on a change in pull request #8839: URL: https://github.com/apache/kafka/pull/8839#discussion_r439227501 ## File path: docs/connect.html ## @@ -180,13 +182,80 @@ TransformationsSetSchemaMetadata - modify the schema name or version TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression +Filter - Removes messages from all further processing. This is used with a predicate to selectively filter certain messages. Details on how to configure each transformation are listed below: + +Predicates + +Transformations can be configured with prediates so that the transformation is applied only to messages which satisfy some condition. In particular, when combined with the Filter transformation predicates can be used to selectively filter out certain messages. Review comment: ```suggestion Transformations can be configured with predicates so that the transformation is applied only to messages which satisfy some condition. In particular, when combined with the Filter transformation predicates can be used to selectively filter out certain messages. ``` ## File path: docs/connect.html ## @@ -180,13 +182,80 @@ TransformationsSetSchemaMetadata - modify the schema name or version TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression +Filter - Removes messages from all further processing. This is used with a predicate to selectively filter certain messages. Details on how to configure each transformation are listed below: + +Predicates + +Transformations can be configured with prediates so that the transformation is applied only to messages which satisfy some condition. In particular, when combined with the Filter transformation predicates can be used to selectively filter out certain messages. + +Predicates are specified in the connector configuration. + + +predicates - Set of aliases for the predicates to be applied to some of the transformations. +predicates.$alias.type - Fully qualified class name for the predicate. +predicates.$alias.$predicateSpecificConfig - Configuration properties for the predicate. + + +All transformations have the implicit config properties predicate and negate. A predicular predicate is associated with a transformation by setting the transformation's predicate config to the predicate's alias. The predicate's value can be reversed using the negate configuration property. + +For example, suppose you have a source connector which produces messages to many different topics and you want to: + +filter out the messages in the 'foo' topic entirely +apply the ExtractField transformation with the field name 'other_field' to records in all topics except the topic 'bar' + + +To do this we need to first to filter out the records destined for the topic 'foo'. The Filter transformation removes records from further processing, and can use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. TopicNameMatches's only configuration property is pattern which is a Java regular expression for matching against the topic name. The configuration would look like this: + + +transforms=Filter +transforms.Filter.type=org.apache.kafka.connect.transforms.Filter +transforms.Filter.predicate=IsFoo + +predicates=IsFoo + predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches +predicates.IsFoo.pattern=foo + + +Next we need to apply ExtractField only when the topic name of the record is not 'bar'. We can't just use TopicNameMatches directly, because that would apply the transformation to matching topic names, not topic names which do not match. The transformation's implicit negate config properties allows us to invert the set of records which a predicate matches. Adding the configuration for this to the previous example we arrive at: + + +transforms=Filter,Extract +transforms.Filter.type=org.apache.kafka.connect.transforms.Filter +transforms.Filter.predicate=IsFoo + + transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key +transforms.Extract.field=other_field +transforms.Extract.predicate=IsBar +transforms.Extract.negate=true
[GitHub] [kafka] ijuma commented on pull request #8857: KAFKA-10157: Fix broken tests due to InterruptedException from FinalizedFeatureChangeListener
ijuma commented on pull request #8857: URL: https://github.com/apache/kafka/pull/8857#issuecomment-643081702 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #8857: KAFKA-10157: Fix broken tests due to InterruptedException from FinalizedFeatureChangeListener
kowshik commented on pull request #8857: URL: https://github.com/apache/kafka/pull/8857#issuecomment-643080803 cc @ijuma @apovzner @junrao @abbccdda for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik opened a new pull request #8857: KAFKA-10157: Fix broken tests due to FinalizedFeatureChangeListener i…
kowshik opened a new pull request #8857: URL: https://github.com/apache/kafka/pull/8857 The call to `ChangeNotificationProcessorThread.queue.take()` could throw an `InterruptedException`. While the queue is empty and the thread is blocking on taking an item from the queue, a concurrent call to `FinalizedFeatureChangeListener.close()` could interrupt the thread and cause an `InterruptedException` to be raised from `queue.take()`. In such a case, it is safe to ignore the exception since the thread is being shutdown. Definitely ignoring the `InterruptedException` for the above reason was the intent of the code that used the `ignoring` clause for the same. But it seems unfortunately the `ignoring` clause does not ignore `InterruptedException`, so that doesn't work for us. To confirm this theory, I found the following code in `scala.util.control.Exception.scala`: https://github.com/scala/scala/blob/v2.12.0/src/library/scala/util/control/Exception.scala#L167-L176. The fix in this PR is to just not use the `ignoring` clause. **Test plan:** Ran the unit and integration tests and found that the test failures are gone now. I will wait for CI to pass before merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Guo updated KAFKA-10134: - Description: We want to utilize the new rebalance protocol to mitigate the stop-the-world effect during the rebalance as our tasks are long running task. But after the upgrade when we try to kill an instance to let rebalance happen when there is some load(some are long running tasks >30S) there, the CPU will go sky-high. It reads ~700% in our metrics so there should be several threads are in a tight loop. We have several consumer threads consuming from different partitions during the rebalance. This is reproducible in both the new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The difference is that with old eager rebalance rebalance protocol used the high CPU usage will dropped after the rebalance done. But when using cooperative one, it seems the consumers threads are stuck on something and couldn't finish the rebalance so the high CPU usage won't drop until we stopped our load. Also a small load without long running task also won't cause continuous high CPU usage as the rebalance can finish in that case. "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at By debugging into the code we found it looks like the clients are in a loop on finding the coordinator. I also tried the old rebalance protocol for the new version the issue still exists but the CPU will be back to normal when the rebalance is done. Also tried the same on the 2.4.1 which seems don't have this issue. So it seems related something changed between 2.4.1 and 2.5.0. was: We want to utilize the new rebalance protocol to mitigate the stop-the-world effect during the rebalance as our tasks are long running task. But after the upgrade when we try to kill an instance when there is some load(long running tasks >30S) there, the CPU will go sky-high. It reads ~700% in our metrics so it should several threads are in a tight loop. "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at By debugging into the code we found it looks like the clients are in a loop on finding the coordinator. I also tried the old rebalance protocol for the new version the issue still exists but the CPU will be back to normal when the rebalance is done. Also tried the same on the 2.4.1 which seems don't have this issue. So it seems related something changed between 2.4.1 and 2.5.0. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Priority: Major > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old ea
[jira] [Created] (KAFKA-10157) Multiple tests failed due to "Failed to process feature ZK node change event"
Anna Povzner created KAFKA-10157: Summary: Multiple tests failed due to "Failed to process feature ZK node change event" Key: KAFKA-10157 URL: https://issues.apache.org/jira/browse/KAFKA-10157 Project: Kafka Issue Type: Bug Reporter: Anna Povzner Multiple tests failed due to "Failed to process feature ZK node change event". Looks like a result of merge of this PR: [https://github.com/apache/kafka/pull/8680] Note that running tests without `--info` gives output like this one: {quote}Process 'Gradle Test Executor 36' finished with non-zero exit value 1 {quote} kafka.network.DynamicConnectionQuotaTest failed: {quote} kafka.network.DynamicConnectionQuotaTest > testDynamicConnectionQuota STANDARD_OUT [2020-06-11 20:52:42,596] ERROR [feature-zk-node-event-process-thread]: Failed to process feature ZK node change event. The broker will eventually exit. (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) at java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) at kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147){quote} kafka.api.CustomQuotaCallbackTest failed: {quote} [2020-06-11 21:07:36,745] ERROR [feature-zk-node-event-process-thread]: Failed to process feature ZK node change event. The broker will eventually exit. (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) at java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) at kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at scala.util.control.Exception$Catch.apply(Exception.scala:227) at kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at scala.util.control.Exception$Catch.apply(Exception.scala:227) at kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) {quote} kafka.server.DynamicBrokerReconfigurationTest failed: {quote} [2020-06-11 21:13:01,207] ERROR [feature-zk-node-event-process-thread]: Failed to process feature ZK node change event. The broker will eventually exit. (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76) java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090) at java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433) at kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at scala.util.control.Exception$Catch.apply(Exception.scala:227) at kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles
[ https://issues.apache.org/jira/browse/KAFKA-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10155: --- Assignee: Chia-Ping Tsai > Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles > --- > > Key: KAFKA-10155 > URL: https://issues.apache.org/jira/browse/KAFKA-10155 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Sophie Blee-Goldman >Assignee: Chia-Ping Tsai >Priority: Major > > Seems to fail more often than not. I've seen this fail on almost every single > build of the latest PRs > > h3. Stacktrace > java.lang.NullPointerException at > kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577) > at > kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at > java.base/java.lang.Thread.run(Thread.java:830) > h3. Standard Output > Current partition replica assignment > \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} > Proposed partition reassignment configuration > \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]} > Current partition replica assignment > \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} > Proposed partition reassignment configuration > \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles
[ https://issues.apache.org/jira/browse/KAFKA-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133891#comment-17133891 ] Sophie Blee-Goldman commented on KAFKA-10155: - Oh cool, thanks for the heads up. I'll assign this ticket to you then > Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles > --- > > Key: KAFKA-10155 > URL: https://issues.apache.org/jira/browse/KAFKA-10155 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Sophie Blee-Goldman >Priority: Major > > Seems to fail more often than not. I've seen this fail on almost every single > build of the latest PRs > > h3. Stacktrace > java.lang.NullPointerException at > kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577) > at > kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at > java.base/java.lang.Thread.run(Thread.java:830) > h3. Standard Output > Current partition replica assignment > \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} > Proposed partition reassignment configuration > \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]} > Current partition replica assignment > \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} > Proposed partition reassignment configuration > \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles
[ https://issues.apache.org/jira/browse/KAFKA-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133856#comment-17133856 ] Chia-Ping Tsai commented on KAFKA-10155: This issue will be resolved by KAFKA-10147 > Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles > --- > > Key: KAFKA-10155 > URL: https://issues.apache.org/jira/browse/KAFKA-10155 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Sophie Blee-Goldman >Priority: Major > > Seems to fail more often than not. I've seen this fail on almost every single > build of the latest PRs > > h3. Stacktrace > java.lang.NullPointerException at > kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577) > at > kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at > java.base/java.lang.Thread.run(Thread.java:830) > h3. Standard Output > Current partition replica assignment > \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} > Proposed partition reassignment configuration > \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]} > Current partition replica assignment > \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} > Proposed partition reassignment configuration > \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition
mumrah commented on pull request #8841: URL: https://github.com/apache/kafka/pull/8841#issuecomment-643035118 The test failures seem to be flaky (KAFKA-10155) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection
chia7712 commented on a change in pull request #8853: URL: https://github.com/apache/kafka/pull/8853#discussion_r439170177 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java ## @@ -560,10 +529,14 @@ synchronized private Config getResourceDescription(ConfigResource resource) { } case TOPIC: { TopicMetadata topicMetadata = allTopics.get(resource.name()); -if (topicMetadata == null) { -throw new UnknownTopicOrPartitionException(); +if (topicMetadata != null && !topicMetadata.markedForDeletion) { +if (topicMetadata.fetchesRemainingUntilVisible > 0) topicMetadata.fetchesRemainingUntilVisible--; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection
chia7712 commented on a change in pull request #8853: URL: https://github.com/apache/kafka/pull/8853#discussion_r439167614 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java ## @@ -365,51 +366,6 @@ synchronized public DescribeTopicsResult describeTopics(Collection topic return new DescribeTopicsResult(topicDescriptions); } -@Override -public DescribeConfigsResult describeConfigs(Collection resources) { Review comment: this method has default implementation (see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java#L340) which calls the variety ```describeConfigs(Collection, DescribeConfigsOptions)``` so it is ok to remove the new implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
ableegoldman commented on pull request #8856: URL: https://github.com/apache/kafka/pull/8856#issuecomment-643022606 Also failed with unrelated (extremely) flaky `ReassignPartitionsUnitTest.testModifyBrokerThrottles` and `SslSelectorTest.testCloseOldestConnection` (tickets created) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10156) Flaky Test SslSelectorTest.testCloseOldestConnection
Sophie Blee-Goldman created KAFKA-10156: --- Summary: Flaky Test SslSelectorTest.testCloseOldestConnection Key: KAFKA-10156 URL: https://issues.apache.org/jira/browse/KAFKA-10156 Project: Kafka Issue Type: Bug Components: network Reporter: Sophie Blee-Goldman Failed twice on the same build with h3. Stacktrace java.lang.AssertionError: The idle connection should have been closed at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.kafka.common.network.SelectorTest.testCloseOldestConnection(SelectorTest.java:466) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
ableegoldman commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-643022251 Java 11 and 8 failed with (extremely) flaky `ReassignPartitionsUnitTest.testModifyBrokerThrottles` -- https://issues.apache.org/jira/browse/KAFKA-10155 Java 14 failed with `SslSelectorTest.testCloseOldestConnection`-- https://issues.apache.org/jira/browse/KAFKA-10156 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource
[ https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133836#comment-17133836 ] Chia-Ping Tsai commented on KAFKA-10147: give blocked as it breaks test (ReassignPartitionsUnitTest#testModifyBrokerThrottles) > MockAdminClient#describeConfigs(Collection) is unable to > handle broker resource > --- > > Key: KAFKA-10147 > URL: https://issues.apache.org/jira/browse/KAFKA-10147 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > > MockAdminClient#describeConfigs(Collection) has new > implementation introduced by > https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06. > It does not handle broker resource so > ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles
Sophie Blee-Goldman created KAFKA-10155: --- Summary: Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles Key: KAFKA-10155 URL: https://issues.apache.org/jira/browse/KAFKA-10155 Project: Kafka Issue Type: Bug Components: admin Reporter: Sophie Blee-Goldman Seems to fail more often than not. I've seen this fail on almost every single build of the latest PRs h3. Stacktrace java.lang.NullPointerException at kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577) at kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:830) h3. Standard Output Current partition replica assignment \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]} Current partition replica assignment \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource
[ https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10147: --- Priority: Blocker (was: Major) > MockAdminClient#describeConfigs(Collection) is unable to > handle broker resource > --- > > Key: KAFKA-10147 > URL: https://issues.apache.org/jira/browse/KAFKA-10147 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > > MockAdminClient#describeConfigs(Collection) has new > implementation introduced by > https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06. > It does not handle broker resource so > ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10154) Issue in updating metadata if not exists during sending message to different topics
Dipti Gupta created KAFKA-10154: --- Summary: Issue in updating metadata if not exists during sending message to different topics Key: KAFKA-10154 URL: https://issues.apache.org/jira/browse/KAFKA-10154 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.5.0 Reporter: Dipti Gupta Project with following behaviour at : [https://github.com/DiptiGupta/kafka-producer-issue] I took reference to this fixed issue https://issues.apache.org/jira/browse/KAFKA-8623 But on latest version, I'm getting following exception during sending messages to different topics i.e. Topic1 and Topic2. It's causing exception for once when metadata for *`Topic2`* doesn't exist. {code:java} org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic Topic2 not present in metadata after 1 ms.org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic Topic2 not present in metadata after 1 ms. at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) ~[spring-kafka-2.5.1.RELEASE.jar:2.5.1.RELEASE]{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
andrewchoi5 commented on pull request #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-643017385 > > In the catch case for ZooKeeperClientException, I have populated the responseMap with the topic partition and the `Errors.NETWORK_EXCEPTION`. If you suggest any other Error to be populated in this responseMap, please let me know and I will change it accordingly. > > I haven't looked at this code in a while so I may not have enough context at this point, but I don't think we should use the network exception error code - i.e., this isn't a network issue between the coordinator and broker but between the broker and zk. Also, there doesn't seem to be any active retry attempt from the controller to resend the request in this scenario. Correct -- I wasn't able to find the best, close enough Errors exception to populate, especially since there was none related to ZooKeeper in that class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jjkoshy commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
jjkoshy commented on pull request #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-643015314 > In the catch case for ZooKeeperClientException, I have populated the responseMap with the topic partition and the `Errors.NETWORK_EXCEPTION`. If you suggest any other Error to be populated in this responseMap, please let me know and I will change it accordingly. I haven't looked at this code in a while so I may not have enough context at this point, but I don't think we should use the network exception error code - i.e., this isn't a network issue between the coordinator and broker but between the broker and zk. Also, there doesn't seem to be any active retry attempt from the controller to resend the request in this scenario. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
skaundinya15 commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-643012766 Thanks for the reviews @hachikuji. I updated the PR per your suggestions and left some follow up comments for clarification. Whenever you're ready, the PR is ready for another review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
skaundinya15 commented on a change in pull request #8850: URL: https://github.com/apache/kafka/pull/8850#discussion_r439156564 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1784,8 +1785,24 @@ class Log(@volatile private var _dir: File, private def deleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs < 0) return 0 val startMs = time.milliseconds -deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, - reason = s"retention time ${config.retentionMs}ms breach") + +def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { + if (startMs - segment.largestTimestamp > config.retentionMs) { +segment.largestRecordTimestamp match { + case Some(ts) => +info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retentionMs breach. Largest record timestamp of segment is $ts") Review comment: > Also, we seem to have lost mention o I'll clarify the log message as you suggested. Could you explain what you meant by the above? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
andrewchoi5 commented on pull request #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-643010366 Hi @junrao. Thank you for your review -- I have further synced up with @jjkoshy on this PR. The partition's new leader epoch update is actually happening after the point at which ZooKeeper Exception is thrown. Therefore, when the `createLogs` throws ZooKeeper Exception, the new leader epoch does not actually get updated. In the catch case for ZooKeeperClientException, I have populated the responseMap with the topic partition and the `Errors.NETWORK_EXCEPTION`. If you suggest any other Error to be populated in this responseMap, please let me know and I will change it accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
skaundinya15 commented on a change in pull request #8850: URL: https://github.com/apache/kafka/pull/8850#discussion_r439155424 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1702,11 +1702,12 @@ class Log(@volatile private var _dir: File, * (if there is one) and returns true iff it is deletable * @return The number of segments deleted */ - private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { + private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean) = { Review comment: Wouldn't keeping the reason be a redundant? Since for every segment we delete we are logging exactly why we are deleting and the details surrounding the deleition. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
guozhangwang commented on pull request #8856: URL: https://github.com/apache/kafka/pull/8856#issuecomment-643007770 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
guozhangwang commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-643007836 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
guozhangwang commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-643007754 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
ableegoldman commented on pull request #8856: URL: https://github.com/apache/kafka/pull/8856#issuecomment-643003389 @guozhangwang @mjsax @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
ableegoldman opened a new pull request #8856: URL: https://github.com/apache/kafka/pull/8856 1. KAFKA-10150: - always transition to SUSPENDED during `suspend`, no matter the current state - only call `prepareCommit` before closing if `task.commitNeeded` is true 2. Don't commit any consumed offsets during `handleAssignment` -- revoked active tasks (and any others that need committing) will be committed during `handleRevocation` ao we only need to worry about cleaning them up in `handleAssignment` 3. KAFKA-10152: when recycling a task we should always commit consumed offsets (if any), but don't need to write the checkpoint (since changelog offsets are preserved across task transitions) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r439143850 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def waitForConnectionSlot(listenerName: ListenerName, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - if (!connectionSlotAvailable(listenerName)) { + val startTimeMs = time.milliseconds() + val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0) + + if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) { val startNs = time.nanoseconds +val endThrottleTimeMs = startTimeMs + throttleTimeMs +var remainingThrottleTimeMs = throttleTimeMs do { - counts.wait() -} while (!connectionSlotAvailable(listenerName)) + counts.wait(remainingThrottleTimeMs) Review comment: This is exactly the behavior proposed in KIP -- if we reach any limit (number of connections or connection rate), we need to wait. So, if there is no space for a new connection, and the delay due to rate limit has passed, we would have to wait for a connection slot. However, remember that if we are waiting on an inter-broker connection slot, the broker finds and closes a connection of another listener to accommodate inter-broker connection. See KIP-402. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
guozhangwang commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642992126 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #8855: DO NOT MERGE
mjsax opened a new pull request #8855: URL: https://github.com/apache/kafka/pull/8855 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for retry backoff ms. # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of attempts and returns the backoff/timeout value at the corresponding level. Thus, we can add a new class property to those classes containing retriable data in order to record the number of failed attempts. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So we can let the Accumulator calculate the new retry backoff for each bach when it enqueues them, to avoid instantiate the util class multiple times. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new class property of type `Long` to record the number of attempts. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will have the new property recording the number of attempts. Metadata: # Metadata lives as a singleton in many clients. Add a new property recording the number of attempts AdminClient: # AdminClient has its own request abstraction Call. The failed attempts are already kept by the abstraction. So probably clean the Call class logic a bit. There're other common usages look like client.poll(timeout), where the timeout passed in is the retry backoff value. We won't change these usages since its underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), which means if no interested op exists, the client will block retry backoff milliseconds. This is an optimization when there's no request that needs to be sent but the client is waiting for responses. Specifically, if the client fails the inflight requests before the retry backoff milliseconds passed, it still needs to wait until that amount of time passed, unless there's a new request need to be sent. was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for retry backoff ms. # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of attempts and returns the backoff/timeout value at the corresponding level. Thus, we can add a new class property to those classes containing retriable data in order to record the number of failed attempts. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by aggregation. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will aggregate the context class and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState can aggregate the context. Metadata: #
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for retry backoff ms. # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of attempts and returns the backoff/timeout value at the corresponding level. Thus, we can add a new class property to those classes containing retriable data in order to record the number of failed attempts. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by aggregation. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will aggregate the context class and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState can aggregate the context. Metadata: # Metadata lives as a singleton in many clients. It can aggregate the context. AdminClient: # AdminClient has its own request abstraction Call. The failed attempts are kept by the abstraction. So probably clean the Call class logic a bit by aggregation. There're other common usages look like client.poll(timeout), where the timeout passed in is the retry backoff value. We won't change these usages since its underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), which means if no interested op exists, the client will block retry backoff milliseconds. This is an optimization when there's no request that needs to be sent but the client is waiting for responses. Specifically, if the client fails the inflight requests before the retry backoff milliseconds passed, it still needs to wait until that amount of time passed, unless there's a new request need to be sent. was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for retry backoff ms. # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data aggregate an abstract class FailureContext. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by aggregation. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will aggregate the context class and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState can aggregate the context. Metadata: # Metadata lives as a singleton in many clients. It c
[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
ableegoldman commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642984741 Unrelated flaky test failed: `kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8686) Flaky test ExampleConnectIntegrationTest#testSinkConnector
[ https://issues.apache.org/jira/browse/KAFKA-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-8686. --- Resolution: Fixed This test class has been running dependably for some time. Closing as fixed. > Flaky test ExampleConnectIntegrationTest#testSinkConnector > -- > > Key: KAFKA-8686 > URL: https://issues.apache.org/jira/browse/KAFKA-8686 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Konstantine Karantasis >Priority: Major > Labels: flaky-test > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/429/console > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector.test.stdout*20:09:20* > *20:09:20* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > > testSinkConnector FAILED*20:09:20* java.lang.AssertionError: Condition > not met within timeout 15000. Connector tasks were not assigned a partition > each.*20:09:20* at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*20:09:20* > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*20:09:20* > at > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector(ExampleConnectIntegrationTest.java:128) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #7224: KAFKA-8391, KAFKA-8661: Improve flaky Connect rebalance integration tests
kkonstantine commented on pull request #7224: URL: https://github.com/apache/kafka/pull/7224#issuecomment-642975105 @rhauch https://github.com/apache/kafka/pull/8805 seems to have fixed the root cause of these failures. The tests haven't failed since then. They seem more reliable now, at least compared to the previous state. You might want to consider closing or refactoring this PR to keep only the useful parts (logging / testing hooks) in favor of https://github.com/apache/kafka/pull/8805 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8661) Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors
[ https://issues.apache.org/jira/browse/KAFKA-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-8661. --- Resolution: Fixed The tests have been fixed and reenabled with [https://github.com/apache/kafka/pull/8805] and have been running dependably since then > Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors > -- > > Key: KAFKA-8661 > URL: https://issues.apache.org/jira/browse/KAFKA-8661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Randall Hauch >Priority: Critical > Labels: flaky-test > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/224/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testStartTwoConnectors/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not start in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testStartTwoConnectors(RebalanceSourceConnectorsIntegrationTest.java:120){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8661) Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors
[ https://issues.apache.org/jira/browse/KAFKA-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-8661: -- Fix Version/s: 2.5.1 2.4.2 2.3.2 > Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors > -- > > Key: KAFKA-8661 > URL: https://issues.apache.org/jira/browse/KAFKA-8661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Randall Hauch >Priority: Critical > Labels: flaky-test > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/224/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testStartTwoConnectors/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not start in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testStartTwoConnectors(RebalanceSourceConnectorsIntegrationTest.java:120){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8686) Flaky test ExampleConnectIntegrationTest#testSinkConnector
[ https://issues.apache.org/jira/browse/KAFKA-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis reassigned KAFKA-8686: - Assignee: Konstantine Karantasis > Flaky test ExampleConnectIntegrationTest#testSinkConnector > -- > > Key: KAFKA-8686 > URL: https://issues.apache.org/jira/browse/KAFKA-8686 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Konstantine Karantasis >Priority: Major > Labels: flaky-test > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/429/console > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector.test.stdout*20:09:20* > *20:09:20* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > > testSinkConnector FAILED*20:09:20* java.lang.AssertionError: Condition > not met within timeout 15000. Connector tasks were not assigned a partition > each.*20:09:20* at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*20:09:20* > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*20:09:20* > at > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector(ExampleConnectIntegrationTest.java:128) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8555) Flaky test ExampleConnectIntegrationTest#testSourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis reassigned KAFKA-8555: - Assignee: Konstantine Karantasis > Flaky test ExampleConnectIntegrationTest#testSourceConnector > > > Key: KAFKA-8555 > URL: https://issues.apache.org/jira/browse/KAFKA-8555 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Boyang Chen >Assignee: Konstantine Karantasis >Priority: Major > Attachments: log-job139.txt, log-job141.txt, log-job23145.txt, > log-job23215.txt, log-job6046.txt > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/22798/console] > *02:03:21* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector.test.stdout*02:03:21* > *02:03:21* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > > testSourceConnector FAILED*02:03:21* > org.apache.kafka.connect.errors.DataException: Insufficient records committed > by connector simple-conn in 15000 millis. Records expected=2000, > actual=1013*02:03:21* at > org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:188)*02:03:21* > at > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:181)*02:03:21* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-8391. --- Resolution: Fixed The tests have been fixed and reenabled with [https://github.com/apache/kafka/pull/8805] and have been running dependably since then > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Randall Hauch >Priority: Critical > Labels: flaky-test > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-8391: -- Fix Version/s: 2.5.1 2.4.2 2.3.2 > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Randall Hauch >Priority: Critical > Labels: flaky-test > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
mjsax commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642968031 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8852: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on pull request #8852: URL: https://github.com/apache/kafka/pull/8852#issuecomment-642966020 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection
abbccdda commented on a change in pull request #8853: URL: https://github.com/apache/kafka/pull/8853#discussion_r439101876 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java ## @@ -560,10 +529,14 @@ synchronized private Config getResourceDescription(ConfigResource resource) { } case TOPIC: { TopicMetadata topicMetadata = allTopics.get(resource.name()); -if (topicMetadata == null) { -throw new UnknownTopicOrPartitionException(); +if (topicMetadata != null && !topicMetadata.markedForDeletion) { +if (topicMetadata.fetchesRemainingUntilVisible > 0) topicMetadata.fetchesRemainingUntilVisible--; Review comment: nit: topicMetadata.fetchesRemainingUntilVisible = Math.max(0, topicMetadata.fetchesRemainingUntilVisible -1); ## File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java ## @@ -365,51 +366,6 @@ synchronized public DescribeTopicsResult describeTopics(Collection topic return new DescribeTopicsResult(topicDescriptions); } -@Override -public DescribeConfigsResult describeConfigs(Collection resources) { Review comment: Maybe just deprecate public method instead of removing it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
guozhangwang commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r439093372 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -210,9 +214,8 @@ public void handleAssignment(final Map> activeTasks, tasksToRecycle.add(task); } else { try { -task.prepareCloseClean(); -final Map committableOffsets = task -.committableOffsetsAndMetadata(); +task.suspend(); Review comment: We may not call `handleRevocation` before calling `handleAssignment` so the task to close may not be in SUSPENDED state yet, and hence do close them we need to commit their states. For other tasks, they are not necessarily committing but I think the point was, that since we are going to send one `commit` request anyways so just commit for everyone --- note that flushing can indeed be skipped, which is what KAFKA-9450 covers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
ableegoldman commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r439086416 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -210,9 +214,8 @@ public void handleAssignment(final Map> activeTasks, tasksToRecycle.add(task); } else { try { -task.prepareCloseClean(); -final Map committableOffsets = task -.committableOffsetsAndMetadata(); +task.suspend(); Review comment: @mjsax @guozhangwang why do we need to commit at all during `handleAssignment`? Shouldn't we have already committed all tasks that _need_ to be committed during `handleRevocation`? That's not exactly a bug, I'm just wondering if it's necessary? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10149) Do not prevent automatic preferred election when reassignment in progress
[ https://issues.apache.org/jira/browse/KAFKA-10149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Barrett reassigned KAFKA-10149: --- Assignee: Bob Barrett > Do not prevent automatic preferred election when reassignment in progress > - > > Key: KAFKA-10149 > URL: https://issues.apache.org/jira/browse/KAFKA-10149 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Bob Barrett >Priority: Major > > Currently the controller will not do preferred leader elections automatically > when a reassignment is in progress. If a broker crashes or is restarted with > a reassignment in progress, this leaves the leadership massively skewed until > the reassignment completes. I am not sure if there is a good reason for this, > but it seems not ideal. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10153) Documentation for the Errant Record Reporter
[ https://issues.apache.org/jira/browse/KAFKA-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aakash Shah updated KAFKA-10153: Component/s: KafkaConnect > Documentation for the Errant Record Reporter > > > Key: KAFKA-10153 > URL: https://issues.apache.org/jira/browse/KAFKA-10153 > Project: Kafka > Issue Type: Task > Components: documentation, KafkaConnect >Affects Versions: 2.6.0 >Reporter: Aakash Shah >Assignee: Aakash Shah >Priority: Major > > Add documentation for the new Errant Record Reporter API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10153) Documentation for the Errant Record Reporter
Aakash Shah created KAFKA-10153: --- Summary: Documentation for the Errant Record Reporter Key: KAFKA-10153 URL: https://issues.apache.org/jira/browse/KAFKA-10153 Project: Kafka Issue Type: Task Components: documentation Affects Versions: 2.6.0 Reporter: Aakash Shah Assignee: Aakash Shah Add documentation for the new Errant Record Reporter API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
ableegoldman commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r439075789 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -504,88 +438,66 @@ public void update(final Set topicPartitions, final Map - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * - * - * @param cleanshut down cleanly (ie, incl. flush) if {@code true} -- - * otherwise, just close open resources - * @throws TaskMigratedException if the task producer got fenced (EOS) - */ -private void prepareClose(final boolean clean) { -// Reset any previously scheduled checkpoint. -checkpoint = null; - +private void maybeScheduleCheckpoint() { switch (state()) { -case CREATED: -// the task is created and not initialized, just re-write the checkpoint file -scheduleCheckpoint(emptyMap()); +case RESTORING: +this.checkpoint = checkpointableOffsets(); + break; case RUNNING: -closeTopology(clean); - -if (clean) { -stateMgr.flush(); -recordCollector.flush(); -scheduleCheckpoint(checkpointableOffsets()); -} else { -executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); +if (!eosEnabled) { +this.checkpoint = checkpointableOffsets(); } break; -case RESTORING: -executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); -scheduleCheckpoint(emptyMap()); +case SUSPENDED: +this.checkpoint = checkpointableOffsets(); break; -case SUSPENDED: +case CREATED: case CLOSED: -// not need to checkpoint, since when suspending we've already committed the state -break; +throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id); default: -throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id); +throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id); } } -private void scheduleCheckpoint(final Map checkpoint) { -this.checkpoint = checkpoint; -} - private void writeCheckpointIfNeed() { +if (commitNeeded) { Review comment: Too late, I already created a ticket for it 🙂 But after starting to work on it, I agree, they should be addressed in one PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
guozhangwang commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r439074869 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -504,88 +438,66 @@ public void update(final Set topicPartitions, final Map - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * - * - * @param cleanshut down cleanly (ie, incl. flush) if {@code true} -- - * otherwise, just close open resources - * @throws TaskMigratedException if the task producer got fenced (EOS) - */ -private void prepareClose(final boolean clean) { -// Reset any previously scheduled checkpoint. -checkpoint = null; - +private void maybeScheduleCheckpoint() { switch (state()) { -case CREATED: -// the task is created and not initialized, just re-write the checkpoint file -scheduleCheckpoint(emptyMap()); +case RESTORING: +this.checkpoint = checkpointableOffsets(); + break; case RUNNING: -closeTopology(clean); - -if (clean) { -stateMgr.flush(); -recordCollector.flush(); -scheduleCheckpoint(checkpointableOffsets()); -} else { -executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); +if (!eosEnabled) { +this.checkpoint = checkpointableOffsets(); } break; -case RESTORING: -executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); -scheduleCheckpoint(emptyMap()); +case SUSPENDED: +this.checkpoint = checkpointableOffsets(); break; -case SUSPENDED: +case CREATED: case CLOSED: -// not need to checkpoint, since when suspending we've already committed the state -break; +throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id); default: -throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id); +throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id); } } -private void scheduleCheckpoint(final Map checkpoint) { -this.checkpoint = checkpoint; -} - private void writeCheckpointIfNeed() { +if (commitNeeded) { Review comment: Nice catch. Let's just fix it along with 10150? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10152) Attempt to write checkpoint without first committing during recycle
[ https://issues.apache.org/jira/browse/KAFKA-10152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10152: --- Assignee: Sophie Blee-Goldman > Attempt to write checkpoint without first committing during recycle > --- > > Key: KAFKA-10152 > URL: https://issues.apache.org/jira/browse/KAFKA-10152 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > Seen causing failure of > `SmokeTestDirverIntegrationTest#shouldWorkWithRebalance locally: > {code:java} > Caused by: java.lang.IllegalStateException: A checkpoint should only be > written if no commit is needed. at > org.apache.kafka.streams.processor.internals.StreamTask.writeCheckpointIfNeed(StreamTask.java:534) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeAndRecycleState(StreamTask.java:482) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:115) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:288) > {code} > See comment here: https://github.com/apache/kafka/pull/8833/files#r438953766 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
vvcephei commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-642902146 I've noticed that if you complain, you tend to get results... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
vvcephei commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-642902034 Ah, there we go :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
vvcephei commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-642901512 Hey @bellemare , Thanks so much for the update! Jenkins has been a bit lazy recently, so I'll probably be pinging it for a while... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
vvcephei commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-642901027 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
vvcephei commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-642900612 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10152) Attempt to write checkpoint without first committing during recycle
Sophie Blee-Goldman created KAFKA-10152: --- Summary: Attempt to write checkpoint without first committing during recycle Key: KAFKA-10152 URL: https://issues.apache.org/jira/browse/KAFKA-10152 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Fix For: 2.6.0 Seen causing failure of `SmokeTestDirverIntegrationTest#shouldWorkWithRebalance locally: {code:java} Caused by: java.lang.IllegalStateException: A checkpoint should only be written if no commit is needed. at org.apache.kafka.streams.processor.internals.StreamTask.writeCheckpointIfNeed(StreamTask.java:534) at org.apache.kafka.streams.processor.internals.StreamTask.closeAndRecycleState(StreamTask.java:482) at org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:115) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:288) {code} See comment here: https://github.com/apache/kafka/pull/8833/files#r438953766 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10151) Flaky Test StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
[ https://issues.apache.org/jira/browse/KAFKA-10151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10151: Summary: Flaky Test StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi (was: Flaky Test StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapiStoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi) > Flaky Test > StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi > -- > > Key: KAFKA-10151 > URL: https://issues.apache.org/jira/browse/KAFKA-10151 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, integration-test > Attachments: > StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi.stdout.rtf > > > I've started seeing this fail in the past week or so. Checked out the logs > and there's nothing obviously wrong (ie no ERROR or exception) so it might > just be flaky? > > java.lang.AssertionError: Condition not met within timeout 6. Could not > get expected result in time. at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) at > org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.verifyCountWithTimestamp(StoreUpgradeIntegrationTest.java:367) > at > org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:183) > at > org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:109) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition
hachikuji commented on a change in pull request #8841: URL: https://github.com/apache/kafka/pull/8841#discussion_r439009094 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -978,7 +1000,7 @@ public boolean hasValidPosition() { @Override public boolean hasPosition() { Review comment: Sure, but that led to the opposite problem, in which the enum was inconsistent with the state. In regard to position, I think we should handle this at transition time as mentioned below. If we ensure that position is not null in the fetching and validating states, then I don't see a problem changing `hasPosition` to check it directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10151) Flaky Test StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapiStoreUpgradeIntegrationTest.shouldMigratePersistentKeyValue
Sophie Blee-Goldman created KAFKA-10151: --- Summary: Flaky Test StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapiStoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi Key: KAFKA-10151 URL: https://issues.apache.org/jira/browse/KAFKA-10151 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Attachments: StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi.stdout.rtf I've started seeing this fail in the past week or so. Checked out the logs and there's nothing obviously wrong (ie no ERROR or exception) so it might just be flaky? java.lang.AssertionError: Condition not met within timeout 6. Could not get expected result in time. at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) at org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.verifyCountWithTimestamp(StoreUpgradeIntegrationTest.java:367) at org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:183) at org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:109) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
ableegoldman commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r439004982 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -504,88 +438,66 @@ public void update(final Set topicPartitions, final Map - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * - * - * @param cleanshut down cleanly (ie, incl. flush) if {@code true} -- - * otherwise, just close open resources - * @throws TaskMigratedException if the task producer got fenced (EOS) - */ -private void prepareClose(final boolean clean) { -// Reset any previously scheduled checkpoint. -checkpoint = null; - +private void maybeScheduleCheckpoint() { switch (state()) { -case CREATED: -// the task is created and not initialized, just re-write the checkpoint file -scheduleCheckpoint(emptyMap()); +case RESTORING: +this.checkpoint = checkpointableOffsets(); + break; case RUNNING: -closeTopology(clean); - -if (clean) { -stateMgr.flush(); -recordCollector.flush(); -scheduleCheckpoint(checkpointableOffsets()); -} else { -executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); +if (!eosEnabled) { +this.checkpoint = checkpointableOffsets(); } break; -case RESTORING: -executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); -scheduleCheckpoint(emptyMap()); +case SUSPENDED: +this.checkpoint = checkpointableOffsets(); break; -case SUSPENDED: +case CREATED: case CLOSED: -// not need to checkpoint, since when suspending we've already committed the state -break; +throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id); default: -throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id); +throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id); } } -private void scheduleCheckpoint(final Map checkpoint) { -this.checkpoint = checkpoint; -} - private void writeCheckpointIfNeed() { +if (commitNeeded) { Review comment: Just saw this in the `SmokeTestDriverIntegrationTest#shouldWorkWithRebalance` -- not sure if it merits a separate ticket or can just be fixed together with https://issues.apache.org/jira/browse/KAFKA-10150 ? ``` Caused by: java.lang.IllegalStateException: A checkpoint should only be written if no commit is needed. at org.apache.kafka.streams.processor.internals.StreamTask.writeCheckpointIfNeed(StreamTask.java:534) at org.apache.kafka.streams.processor.internals.StreamTask.closeAndRecycleState(StreamTask.java:482) at org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:115) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:288) ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -504,88 +438,66 @@ public void update(final Set topicPartitions, final Map - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * - * - * @param cleanshut down cleanly (ie, incl. flush) if {@code true} -- - * otherwise, just close open resources - * @throws TaskMigratedException if the task producer got fenced (EOS) - */ -private void prepareClose(final boolean clean) { -// Reset any previously scheduled checkpoint. -checkpoint = null; - +private void maybeScheduleCheckpoint() { switch (state()) { -case CREATED: -// the task is created and not initialized, just re-write the checkpoint file -scheduleCheckpoint(emptyMap()); +case RESTORING: +this.checkpoint = checkpointableOffsets(); + break; case RUNNING: -
[GitHub] [kafka] hachikuji commented on a change in pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on a change in pull request #8850: URL: https://github.com/apache/kafka/pull/8850#discussion_r438997622 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1702,11 +1702,12 @@ class Log(@volatile private var _dir: File, * (if there is one) and returns true iff it is deletable * @return The number of segments deleted */ - private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { + private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean) = { Review comment: Can we keep the reason? On second thought, maybe it's fine to leave this as is. ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1794,20 +1811,29 @@ class Log(@volatile private var _dir: File, def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (diff - segment.size >= 0) { diff -= segment.size +info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + Review comment: In addition, it might be useful to know the total log size. Maybe we could include `size - diff` as the size after deletion? ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1784,8 +1785,24 @@ class Log(@volatile private var _dir: File, private def deleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs < 0) return 0 val startMs = time.milliseconds -deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, - reason = s"retention time ${config.retentionMs}ms breach") + +def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { + if (startMs - segment.largestTimestamp > config.retentionMs) { +segment.largestRecordTimestamp match { + case Some(ts) => +info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retentionMs breach. Largest record timestamp of segment is $ts") Review comment: nit: could we make the connection clearer? How about this? ```scala info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + s" retentionMs breach based on the largest record timestamp from the segment, which is $ts") ``` Also, we seem to have lost mention o This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs
tombentley commented on pull request #8312: URL: https://github.com/apache/kafka/pull/8312#issuecomment-642863307 @mimaison done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10027) Implement read path for feature versioning scheme
[ https://issues.apache.org/jira/browse/KAFKA-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-10027. - Fix Version/s: 2.7.0 Assignee: Kowshik Prakasam Resolution: Fixed merged the PR to trunk > Implement read path for feature versioning scheme > - > > Key: KAFKA-10027 > URL: https://issues.apache.org/jira/browse/KAFKA-10027 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > Fix For: 2.7.0 > > > Goal is to implement various classes and integration for the read path of the > feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > The ultimate plan is that the cluster-wide *finalized* features information > is going to be stored in ZK under the node {{/feature}}. The read path > implemented in this PR is centered around reading this *finalized* features > information from ZK, and, processing it inside the Broker. > > Here is a summary of what's needed for this Jira (a lot of it is *new* > classes): > * A facility is provided in the broker to declare it's supported features, > and advertise it's supported features via it's own {{BrokerIdZNode}} under a > {{features}} key. > * A facility is provided in the broker to listen to and propagate > cluster-wide *finalized* feature changes from ZK. > * When new *finalized* features are read from ZK, feature incompatibilities > are detected by comparing against the broker's own supported features. > * {{ApiVersionsResponse}} is now served containing supported and finalized > feature information (using the newly added tagged fields). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
junrao merged pull request #8680: URL: https://github.com/apache/kafka/pull/8680 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
junrao commented on pull request #8680: URL: https://github.com/apache/kafka/pull/8680#issuecomment-642855293 The test failures seem unrelated. Merging this to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10150) IllegalStateException when revoking task in CREATED state
Sophie Blee-Goldman created KAFKA-10150: --- Summary: IllegalStateException when revoking task in CREATED state Key: KAFKA-10150 URL: https://issues.apache.org/jira/browse/KAFKA-10150 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Assignee: Sophie Blee-Goldman Fix For: 2.6.0 Seen killing threads in soak. During handleAssignment we call #suspend and #prepareCommit on all revoked tasks. Only RUNNING tasks are transitioned to SUSPENDED in #suspend, but #prepareCommit will assert that the state is either RUNNING or SUSPENDED. So tasks that get revoked while in CREATED will hit an IllegalStateException during prepareCommit: {code:java} [2020-06-11T00:39:57-07:00] (streams-soak-KAFKA-10144-corrupted-standby-fix-eos-beta-broker-trunk_soak_i-09ef677ab88c040c8_streamslog) [2020-06-11 07:39:56,852] ERROR [stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] stream-thread [stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] Failed to close task 1_2 cleanly. Attempting to close remaining tasks before re-throwing: (org.apache.kafka.streams.processor.internals.TaskManager)[2020-06-11T00:39:57-07:00] (streams-soak-KAFKA-10144-corrupted-standby-fix-eos-beta-broker-trunk_soak_i-09ef677ab88c040c8_streamslog) [2020-06-11 07:39:56,852] ERROR [stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] stream-thread [stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] Failed to close task 1_2 cleanly. Attempting to close remaining tasks before re-throwing: (org.apache.kafka.streams.processor.internals.TaskManager)[2020-06-11T00:39:57-07:00] (streams-soak-KAFKA-10144-corrupted-standby-fix-eos-beta-broker-trunk_soak_i-09ef677ab88c040c8_streamslog) java.lang.IllegalStateException: Illegal state CREATED while preparing standby task 1_2 for committing at org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:142) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:228) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1350) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:763) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:623) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10149) Do not prevent automatic preferred election when reassignment in progress
Jason Gustafson created KAFKA-10149: --- Summary: Do not prevent automatic preferred election when reassignment in progress Key: KAFKA-10149 URL: https://issues.apache.org/jira/browse/KAFKA-10149 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Currently the controller will not do preferred leader elections automatically when a reassignment is in progress. If a broker crashes or is restarted with a reassignment in progress, this leaves the leadership massively skewed until the reassignment completes. I am not sure if there is a good reason for this, but it seems not ideal. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
ableegoldman commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r438957621 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -504,88 +438,66 @@ public void update(final Set topicPartitions, final Map - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * - * - * @param cleanshut down cleanly (ie, incl. flush) if {@code true} -- - * otherwise, just close open resources - * @throws TaskMigratedException if the task producer got fenced (EOS) - */ -private void prepareClose(final boolean clean) { -// Reset any previously scheduled checkpoint. -checkpoint = null; - +private void maybeScheduleCheckpoint() { switch (state()) { -case CREATED: -// the task is created and not initialized, just re-write the checkpoint file -scheduleCheckpoint(emptyMap()); +case RESTORING: +this.checkpoint = checkpointableOffsets(); + break; case RUNNING: -closeTopology(clean); - -if (clean) { -stateMgr.flush(); -recordCollector.flush(); -scheduleCheckpoint(checkpointableOffsets()); -} else { -executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); +if (!eosEnabled) { +this.checkpoint = checkpointableOffsets(); } break; -case RESTORING: -executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); -scheduleCheckpoint(emptyMap()); +case SUSPENDED: +this.checkpoint = checkpointableOffsets(); break; -case SUSPENDED: +case CREATED: case CLOSED: -// not need to checkpoint, since when suspending we've already committed the state -break; +throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id); default: -throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id); +throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id); } } -private void scheduleCheckpoint(final Map checkpoint) { -this.checkpoint = checkpoint; -} - private void writeCheckpointIfNeed() { +if (commitNeeded) { Review comment: Maybe I'm thinking of standby tasks (ie we only skip checkpointing for recycled standbys). For active tasks, we should probably commit them before recycling right? Or is it ok to skip committing altogether 🤔 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
ableegoldman commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r438953766 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -504,88 +438,66 @@ public void update(final Set topicPartitions, final Map - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * - * - * @param cleanshut down cleanly (ie, incl. flush) if {@code true} -- - * otherwise, just close open resources - * @throws TaskMigratedException if the task producer got fenced (EOS) - */ -private void prepareClose(final boolean clean) { -// Reset any previously scheduled checkpoint. -checkpoint = null; - +private void maybeScheduleCheckpoint() { switch (state()) { -case CREATED: -// the task is created and not initialized, just re-write the checkpoint file -scheduleCheckpoint(emptyMap()); +case RESTORING: +this.checkpoint = checkpointableOffsets(); + break; case RUNNING: -closeTopology(clean); - -if (clean) { -stateMgr.flush(); -recordCollector.flush(); -scheduleCheckpoint(checkpointableOffsets()); -} else { -executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); +if (!eosEnabled) { +this.checkpoint = checkpointableOffsets(); } break; -case RESTORING: -executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); -scheduleCheckpoint(emptyMap()); +case SUSPENDED: +this.checkpoint = checkpointableOffsets(); break; -case SUSPENDED: +case CREATED: case CLOSED: -// not need to checkpoint, since when suspending we've already committed the state -break; +throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id); default: -throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id); +throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id); } } -private void scheduleCheckpoint(final Map checkpoint) { -this.checkpoint = checkpoint; -} - private void writeCheckpointIfNeed() { +if (commitNeeded) { Review comment: @mjsax By `should only be written if no commit is needed` do you mean `...if a commit was just completed`? Doesn't this break `closeAndRecycleState` (I thought iI saw in another comment that we don't write checkpoints during recycle anymore?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
ableegoldman commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r438953766 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -504,88 +438,66 @@ public void update(final Set topicPartitions, final Map - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * - * - * @param cleanshut down cleanly (ie, incl. flush) if {@code true} -- - * otherwise, just close open resources - * @throws TaskMigratedException if the task producer got fenced (EOS) - */ -private void prepareClose(final boolean clean) { -// Reset any previously scheduled checkpoint. -checkpoint = null; - +private void maybeScheduleCheckpoint() { switch (state()) { -case CREATED: -// the task is created and not initialized, just re-write the checkpoint file -scheduleCheckpoint(emptyMap()); +case RESTORING: +this.checkpoint = checkpointableOffsets(); + break; case RUNNING: -closeTopology(clean); - -if (clean) { -stateMgr.flush(); -recordCollector.flush(); -scheduleCheckpoint(checkpointableOffsets()); -} else { -executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); +if (!eosEnabled) { +this.checkpoint = checkpointableOffsets(); } break; -case RESTORING: -executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); -scheduleCheckpoint(emptyMap()); +case SUSPENDED: +this.checkpoint = checkpointableOffsets(); break; -case SUSPENDED: +case CREATED: case CLOSED: -// not need to checkpoint, since when suspending we've already committed the state -break; +throw new IllegalStateException("Illegal state " + state() + " while scheduling checkpoint for active task " + id); default: -throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id); +throw new IllegalStateException("Unknown state " + state() + " while scheduling checkpoint for active task " + id); } } -private void scheduleCheckpoint(final Map checkpoint) { -this.checkpoint = checkpoint; -} - private void writeCheckpointIfNeed() { +if (commitNeeded) { Review comment: @mjsax By `should only be written if no commit is needed` do you mean `...if a commit was just completed`? Doesn't this break `closeAndRecycleState` and `closeDirty` (since we tend to call `closeDirty` after the commit failed and would not have set `commitNeeded = false`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8555) Flaky test ExampleConnectIntegrationTest#testSourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-8555. --- Resolution: Fixed This test class has been running dependably for some time. Closing as fixed. > Flaky test ExampleConnectIntegrationTest#testSourceConnector > > > Key: KAFKA-8555 > URL: https://issues.apache.org/jira/browse/KAFKA-8555 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Boyang Chen >Priority: Major > Attachments: log-job139.txt, log-job141.txt, log-job23145.txt, > log-job23215.txt, log-job6046.txt > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/22798/console] > *02:03:21* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector.test.stdout*02:03:21* > *02:03:21* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > > testSourceConnector FAILED*02:03:21* > org.apache.kafka.connect.errors.DataException: Insufficient records committed > by connector simple-conn in 15000 millis. Records expected=2000, > actual=1013*02:03:21* at > org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:188)*02:03:21* > at > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:181)*02:03:21* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9851) Revoking Connect tasks due to connectivity issues should also clear running assignment
[ https://issues.apache.org/jira/browse/KAFKA-9851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9851: - Component/s: KafkaConnect > Revoking Connect tasks due to connectivity issues should also clear running > assignment > -- > > Key: KAFKA-9851 > URL: https://issues.apache.org/jira/browse/KAFKA-9851 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > https://issues.apache.org/jira/browse/KAFKA-9184 fixed an issue with workers > continuing to run tasks even after they'd lose connectivity with the broker > coordinator and they'd detect that they are out of the group. > > However, because the revocation of tasks in this case is voluntary and does > not come with an explicit assignment (containing revoked tasks) from the > leader worker, the worker that quits running its tasks due to connectivity > issues needs to also clear its running task assignment snapshot. > This will allow for proper restart of the stopped tasks after the worker > rejoins the group when connectivity returns and get assigned the same > connectors or tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest
bbejeck commented on pull request #6229: URL: https://github.com/apache/kafka/pull/6229#issuecomment-642777124 Hi @sh-abhi, >'m having some difficulty doing a rebase successfully after trying some different approaches based on some references I found online. This PR looks good to me, can you say what your difficulties are? At any rate, I'll run this system test to verify the changes soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest
bbejeck commented on pull request #6229: URL: https://github.com/apache/kafka/pull/6229#issuecomment-642774237 Ok to test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
junrao commented on pull request #8680: URL: https://github.com/apache/kafka/pull/8680#issuecomment-642751664 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
junrao commented on pull request #8680: URL: https://github.com/apache/kafka/pull/8680#issuecomment-642750609 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches
[ https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10146: -- Labels: backport (was: ) > Backport KAFKA-9066 to 2.5 and 2.4 branches > --- > > Key: KAFKA-10146 > URL: https://issues.apache.org/jira/browse/KAFKA-10146 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > Labels: backport > Fix For: 2.4.2, 2.5.2 > > > KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so > this was not backported at the time. However, once 2.5.1 is out the door, the > `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` > branches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Affects Version/s: (was: 2.7.0) > Standby state isn't always re-used when transitioning to active > --- > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0 > > > This ticket was initially just to write an integration test, but I escalated > it to a blocker and changed the title when the integration test actually > surfaced two bugs: > # Offset positions were not reported for in-memory stores, so tasks with > in-memory stores would never be considered as "caught up" and could not take > over active processing, preventing clusters from ever achieving balance. This > is a regression in 2.6 > # When the TaskAssignor decided to switch active processing from a former > owner to a new one that had a standby, the lower-level cooperative rebalance > protocol would first de-schedule the task completely, and only later would > assign it to the new owner. For in-memory stores, this causes the standby > state not to be re-used, and for persistent stores, it creates a window in > which the cleanup thread might delete the state directory. In both cases, > even though the instance previously had a standby, once it gets the active, > it still had to restore the entire state from the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches
[ https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10146: -- Issue Type: Task (was: Bug) > Backport KAFKA-9066 to 2.5 and 2.4 branches > --- > > Key: KAFKA-10146 > URL: https://issues.apache.org/jira/browse/KAFKA-10146 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > Fix For: 2.4.2, 2.5.2 > > > KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so > this was not backported at the time. However, once 2.5.1 is out the door, the > `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` > branches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Fix Version/s: (was: 2.7.0) > Standby state isn't always re-used when transitioning to active > --- > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0 > > > This ticket was initially just to write an integration test, but I escalated > it to a blocker and changed the title when the integration test actually > surfaced two bugs: > # Offset positions were not reported for in-memory stores, so tasks with > in-memory stores would never be considered as "caught up" and could not take > over active processing, preventing clusters from ever achieving balance. This > is a regression in 2.6 > # When the TaskAssignor decided to switch active processing from a former > owner to a new one that had a standby, the lower-level cooperative rebalance > protocol would first de-schedule the task completely, and only later would > assign it to the new owner. For in-memory stores, this causes the standby > state not to be re-used, and for persistent stores, it creates a window in > which the cleanup thread might delete the state directory. In both cases, > even though the instance previously had a standby, once it gets the active, > it still had to restore the entire state from the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches
[ https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133340#comment-17133340 ] Randall Hauch commented on KAFKA-10146: --- Added PR: https://github.com/apache/kafka/pull/8854 > Backport KAFKA-9066 to 2.5 and 2.4 branches > --- > > Key: KAFKA-10146 > URL: https://issues.apache.org/jira/browse/KAFKA-10146 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > Fix For: 2.4.2, 2.5.2 > > > KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so > this was not backported at the time. However, once 2.5.1 is out the door, the > `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` > branches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches
[ https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133343#comment-17133343 ] Randall Hauch commented on KAFKA-10146: --- Be sure to update KAFKA-9066 fix versions when this is merged. > Backport KAFKA-9066 to 2.5 and 2.4 branches > --- > > Key: KAFKA-10146 > URL: https://issues.apache.org/jira/browse/KAFKA-10146 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > Fix For: 2.4.2, 2.5.2 > > > KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so > this was not backported at the time. However, once 2.5.1 is out the door, the > `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` > branches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition
mumrah commented on a change in pull request #8841: URL: https://github.com/apache/kafka/pull/8841#discussion_r438870452 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -978,7 +1000,7 @@ public boolean hasValidPosition() { @Override public boolean hasPosition() { Review comment: I thought about this too, but if i recall, the whole point of adding the state enum was to not rely on the instance variables to deduce the state. I think another solution here would be to have some kind of empty position monad to avoid the null in the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch opened a new pull request #8854: KAFKA-10146, KAFKA-9066: Retain metrics for failed tasks (#8502)
rhauch opened a new pull request #8854: URL: https://github.com/apache/kafka/pull/8854 Targets the `2.5` branch, and should be backported to the `2.4` branch. This backports the KAFKA-9066 / #8502 changes to retain metrics for failed tasks that was already merged to `trunk` and backported to the `2.6` branch. This PR has one change relative to the original PR: it removes an integration test added in the `2.6` branch for KIP-158 and modified as part of KAFKA-9066 / #8502. Ping @C0urante (original author) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition
mumrah commented on a change in pull request #8841: URL: https://github.com/apache/kafka/pull/8841#discussion_r438870452 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -978,7 +1000,7 @@ public boolean hasValidPosition() { @Override public boolean hasPosition() { Review comment: I thought about this too, but if i recall, the whole point of adding the state enum was to not rely on the instance variables to deduce the state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9374) Worker can be disabled by blocked connectors
[ https://issues.apache.org/jira/browse/KAFKA-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-9374. -- Reviewer: Konstantine Karantasis Resolution: Fixed Merged to `trunk` and backported to the `2.6` branch for inclusion in 2.6.0. > Worker can be disabled by blocked connectors > > > Key: KAFKA-9374 > URL: https://issues.apache.org/jira/browse/KAFKA-9374 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.6.0 > > > If a connector hangs during any of its {{initialize}}, {{start}}, {{stop}}, > \{taskConfigs}}, {{taskClass}}, {{version}}, {{config}}, or {{validate}} > methods, the worker will be disabled for some types of requests thereafter, > including connector creation, connector reconfiguration, and connector > deletion. > -This only occurs in distributed mode and is due to the threading model used > by the > [DistributedHerder|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java] > class.- This affects both distributed and standalone mode. Distributed > herders perform some connector work synchronously in their {{tick}} thread, > which also handles group membership and some REST requests. The majority of > the herder methods for the standalone herder are {{synchronized}}, including > those for creating, updating, and deleting connectors; as long as one of > those methods blocks, all subsequent calls to any of these methods will also > be blocked. > > One potential solution could be to treat connectors that fail to start, stop, > etc. in time similarly to tasks that fail to stop within the [task graceful > shutdown timeout > period|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L121-L126] > by handling all connector interactions on a separate thread, waiting for > them to complete within a timeout, and abandoning the thread (and > transitioning the connector to the {{FAILED}} state, if it has been created > at all) if that timeout expires. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9374) Worker can be disabled by blocked connectors
[ https://issues.apache.org/jira/browse/KAFKA-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9374: - Fix Version/s: 2.6.0 > Worker can be disabled by blocked connectors > > > Key: KAFKA-9374 > URL: https://issues.apache.org/jira/browse/KAFKA-9374 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.6.0 > > > If a connector hangs during any of its {{initialize}}, {{start}}, {{stop}}, > \{taskConfigs}}, {{taskClass}}, {{version}}, {{config}}, or {{validate}} > methods, the worker will be disabled for some types of requests thereafter, > including connector creation, connector reconfiguration, and connector > deletion. > -This only occurs in distributed mode and is due to the threading model used > by the > [DistributedHerder|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java] > class.- This affects both distributed and standalone mode. Distributed > herders perform some connector work synchronously in their {{tick}} thread, > which also handles group membership and some REST requests. The majority of > the herder methods for the standalone herder are {{synchronized}}, including > those for creating, updating, and deleting connectors; as long as one of > those methods blocks, all subsequent calls to any of these methods will also > be blocked. > > One potential solution could be to treat connectors that fail to start, stop, > etc. in time similarly to tasks that fail to stop within the [task graceful > shutdown timeout > period|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L121-L126] > by handling all connector interactions on a separate thread, waiting for > them to complete within a timeout, and abandoning the thread (and > transitioning the connector to the {{FAILED}} state, if it has been created > at all) if that timeout expires. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition
mumrah commented on a change in pull request #8841: URL: https://github.com/apache/kafka/pull/8841#discussion_r438867535 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp completedFetch.partition); } else { FetchPosition position = subscriptions.position(completedFetch.partition); -if (completedFetch.nextFetchOffset == position.offset) { -List> partRecords = completedFetch.fetchRecords(maxRecords); - -log.trace("Returning {} fetched records at offset {} for assigned partition {}", -partRecords.size(), position, completedFetch.partition); - -if (completedFetch.nextFetchOffset > position.offset) { -FetchPosition nextPosition = new FetchPosition( -completedFetch.nextFetchOffset, -completedFetch.lastEpoch, -position.currentLeader); -log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition); -subscriptions.position(completedFetch.partition, nextPosition); -} +if (position != null) { Review comment: I think we need the check in transition since we set the position in the runnable sometimes. E.g., ```java private void transitionState(FetchState newState, Runnable runIfTransitioned) { FetchState nextState = this.fetchState.transitionTo(newState); if (nextState.equals(newState)) { this.fetchState = nextState; runIfTransitioned.run(); if (this.position == null && nextState.hasPosition()) { throw new IllegalStateException("Transitioned subscription state to " + nextState + ", but position is null"); } } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9969) ConnectorClientConfigRequest is loaded in isolation and throws LinkageError
[ https://issues.apache.org/jira/browse/KAFKA-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-9969. -- Reviewer: Konstantine Karantasis Resolution: Fixed [~kkonstantine] merged to `trunk` and backported to: * `2.6` for inclusion in upcoming 2.6.0 * `2.5` for inclusion in upcoming 2.5.1 * `2.4` for inclusion in a future 2.4.2 * `2.3` for inclusion in a future 2.3.2 > ConnectorClientConfigRequest is loaded in isolation and throws LinkageError > --- > > Key: KAFKA-9969 > URL: https://issues.apache.org/jira/browse/KAFKA-9969 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.0, 2.4.1 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > ConnectorClientConfigRequest (added by > [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]) > is a class in connect-api, and should always be loaded by the system > classloader. If a plugin packages the connect-api jar, the REST API may fail > with the following stacktrace: > {noformat} > java.lang.LinkageError: loader constraint violation: loader (instance of > sun/misc/Launcher$AppClassLoader) previously initiated loading for a > different type with name > "org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest" at > java.lang.ClassLoader.defineClass1(Native Method) at > java.lang.ClassLoader.defineClass(ClassLoader.java:763) at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at > java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at > java.net.URLClassLoader.access$100(URLClassLoader.java:74) at > java.net.URLClassLoader$1.run(URLClassLoader.java:369) at > java.net.URLClassLoader$1.run(URLClassLoader.java:363) at > java.security.AccessController.doPrivileged(Native Method) at > java.net.URLClassLoader.findClass(URLClassLoader.java:362) at > java.lang.ClassLoader.loadClass(ClassLoader.java:424) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at > java.lang.ClassLoader.loadClass(ClassLoader.java:357) at > org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:416) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > {noformat} > It appears that the other class in org.apache.kafka.connect.connector.policy, > ConnectorClientConfigOverridePolicy had a similar issue in KAFKA-8415, and > received a fix. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9969) ConnectorClientConfigRequest is loaded in isolation and throws LinkageError
[ https://issues.apache.org/jira/browse/KAFKA-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9969: - Fix Version/s: 2.5.1 2.4.2 2.6.0 2.3.2 > ConnectorClientConfigRequest is loaded in isolation and throws LinkageError > --- > > Key: KAFKA-9969 > URL: https://issues.apache.org/jira/browse/KAFKA-9969 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.0, 2.4.1 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > ConnectorClientConfigRequest (added by > [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]) > is a class in connect-api, and should always be loaded by the system > classloader. If a plugin packages the connect-api jar, the REST API may fail > with the following stacktrace: > {noformat} > java.lang.LinkageError: loader constraint violation: loader (instance of > sun/misc/Launcher$AppClassLoader) previously initiated loading for a > different type with name > "org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest" at > java.lang.ClassLoader.defineClass1(Native Method) at > java.lang.ClassLoader.defineClass(ClassLoader.java:763) at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at > java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at > java.net.URLClassLoader.access$100(URLClassLoader.java:74) at > java.net.URLClassLoader$1.run(URLClassLoader.java:369) at > java.net.URLClassLoader$1.run(URLClassLoader.java:363) at > java.security.AccessController.doPrivileged(Native Method) at > java.net.URLClassLoader.findClass(URLClassLoader.java:362) at > java.lang.ClassLoader.loadClass(ClassLoader.java:424) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at > java.lang.ClassLoader.loadClass(ClassLoader.java:357) at > org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:416) > at > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > {noformat} > It appears that the other class in org.apache.kafka.connect.connector.policy, > ConnectorClientConfigOverridePolicy had a similar issue in KAFKA-8415, and > received a fix. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10148) Kafka Streams Restores too few Records with eos-beta Enabled
[ https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10148: -- Description: System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a bug that results in wrong results in the output topic. The cause seems to be a too low end offset during restoration of a state store. Example: The system test computes a minimum aggregate over records in an input topic and writes the results to an output topic. The input topic partition {{data-1}} contains the following records among others: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 ... offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 headerKeys: [] key: 14920 payload: 9274 ... {code} The output topic partition {{min-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} The last record is obviously wrong because 1595 is less than 9215. To test the resilience to an unexpected failure of a Streams client, the system tests aborts a Streams client, i.e., the client is closed in a dirty manner. This dirty close causes the Streams client to restore its local state store that maintains the minimum aggregate from the beginning of the changelog topic partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}. The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} Also here the last record is wrong. During the restoration, the Streams client uses its Kafka consumer to issue a list offsets request to get the end offset of the changelog topic partition. The response to the list offsets request contains end offset 1518 for {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} as can be seen here: {code} [2020-06-09 08:11:49,250] DEBUG [Consumer clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, groupId=null] Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-4=PartitionData(errorCode: 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1=PartitionData(errorCode: 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) (org.apache.kafka.clients.NetworkClient) {code} Offset 1518 is before record in {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} {code} offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 {code} Hence, this record is not restored into the local state store. However, after the restoration the input topic partition {{data-1}} is read starting with offset 2094. That means that record {code} offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 {code} is not read there either because it has a lower offset. Instead the following record with with key 14920 and value 9274 is read, but since 9274 is not less than 9215, value 9215 is written a second time to the output topic. I ran the system tests 10x with eos_alpha and 10x with eos_beta and only eos_beta failed a couple of times. was: System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a bug that results in wrong results in the output topic. The cause seems to be a too low end offset during restoration of a state store. Example: The system test computes a minimum aggregate over records in an input topic and writes the results to an output topic. The input topic partition {{data-1}} contains the following records among others: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9
[jira] [Updated] (KAFKA-10148) Kafka Streams Restores too few Records with eos-beta Enabled
[ https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10148: -- Description: System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a bug that results in wrong results in the output topic. The cause seems to be a too low end offset during restoration of a state store. Example: The system test computes a minimum aggregate over records in an input topic and writes the results to an output topic. The input topic partition {{data-1}} contains the following records among others: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 ... offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 headerKeys: [] key: 14920 payload: 9274 ... {code} The output topic partition {{min-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} The last record is obviously wrong because 1595 is less than 9215. To test the resilience to an unexpected failure of a Streams client, the system tests aborts a Streams client, i.e., the client is closed in a dirty manner. This dirty close causes the Streams client to restore its local state store that maintains the minimum aggregate from the beginning of the changelog topic partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}. The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} Also here the last record is wrong. During the restoration, the Streams client uses its Kafka consumer to issue a list offsets request to get the end offset of the changelog topic partition. The response to the list offsets request contains end offset 1518 for {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} as can be seen here: {code} [2020-06-09 08:11:49,250] DEBUG [Consumer clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, groupId=null] Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-4=PartitionData(errorCode: 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1=PartitionData(errorCode: 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) (org.apache.kafka.clients.NetworkClient) {code} Offset 1518 is before record in {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} {code} offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 {code} Hence, this record is not restored into the local state store. However, after the restoration the input topic partition {{data-1}} is read starting with offset 2094. That means that record {code} offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 {code} is not read there either because it has a lower offset. Instead the following record with with key 14920 and value 9274 is read, but since 9274 is not less than 9215, value 9215 is written a second time to the output topic. I run the system tests 10x with eos_alpha and 10x with eos_beta and only eos_beta failed a couple of times. was: System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a bug that results in wrong results in the output topic. The cause seems to be a too low end offset during restoration of a state store. Example: The system test computes a minimum aggregate over records in an input topic and writes the results to an output topic. The input topic partition {{data-1}} contains the following records among others: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9
[GitHub] [kafka] bellemare commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
bellemare commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-642720296 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org