[GitHub] [kafka] chia7712 commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
chia7712 commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r595727918 ## File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala ## @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.admin Review comment: the package name should be `kafka.admin` rather than `package unit.kafka.admin` ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,30 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +try { +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +(inputBrokers, inputBrokers.diff(clusterBrokers)) Review comment: As the variable is called `existingBrokers `, we should find out the "true" existent brokers. In short, it should return `inputBrokers.intersect(clusterBrokers)` rather than `inputBrokers` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #10339: MINOR: Remove redundant allows in import-control.xml
dongjinleekr commented on pull request #10339: URL: https://github.com/apache/kafka/pull/10339#issuecomment-800808391 @dajac @rondagostino @ijuma Could you have a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr opened a new pull request #10339: MINOR: Remove redundant allows in import-control.xml
dongjinleekr opened a new pull request #10339: URL: https://github.com/apache/kafka/pull/10339 I found this problem while working on [KIP-719: Add Log4J2 Appender](https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Add+Log4J2+Appender). 1. `org.apache.log4j` don't need to be allowed in shell, trogdor subpackage; they uses `slf4j`, not `log4`. 2. `org.slf4j` don't need to be allowed in clients, server subpackage: `org.slf4j` is allowed globally. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #10338: KAFKA-10251: wait for consumer rebalance completed before consuming records
showuon opened a new pull request #10338: URL: https://github.com/apache/kafka/pull/10338 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on pull request #10304: URL: https://github.com/apache/kafka/pull/10304#issuecomment-800805388 Good afternoon @chia7712 @dajac Dear committers, if you have any comments on this PR, I will continue to improve it. If you are satisfied with it, can I apply for this PR to be merged into the trunk? :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12384) Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
[ https://issues.apache.org/jira/browse/KAFKA-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-12384: - Assignee: Luke Chen (was: Kamal Chandraprakash) > Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch > - > > Key: KAFKA-12384 > URL: https://issues.apache.org/jira/browse/KAFKA-12384 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > {quote}org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: > <(-1,-1)> at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at > org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at > kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio commented on pull request #10324: URL: https://github.com/apache/kafka/pull/10324#issuecomment-800747908 @cmccabe the `testGetEntries` throw an exception when used against a snapshot: ``` java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) at java.base/java.util.Objects.checkIndex(Objects.java:372) at java.base/java.util.ArrayList.get(ArrayList.java:459) at org.apache.kafka.jmh.timeline.TimelineHashMapBenchmark.testGetEntries(TimelineHashMapBenchmark.java:221) at org.apache.kafka.jmh.timeline.jmh_generated.TimelineHashMapBenchmark_testGetEntries_jmhTest.testGetEntries_avgt_jmhStub(TimelineHashMapBenchmark_testGetEntries_jmhTest.java:246) at org.apache.kafka.jmh.timeline.jmh_generated.TimelineHashMapBenchmark_testGetEntries_jmhTest.testGetEntries_AverageTime(TimelineHashMapBenchmark_testGetEntries_jmhTest.java:183) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453) at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio commented on pull request #10324: URL: https://github.com/apache/kafka/pull/10324#issuecomment-800747379 @ijuma Ready for review. I changed the benchmark structure to remove duplicate code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio commented on a change in pull request #10324: URL: https://github.com/apache/kafka/pull/10324#discussion_r595670643 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java ## @@ -44,33 +49,126 @@ public class TimelineHashMapBenchmark { private final static int NUM_ENTRIES = 1_000_000; +@State(Scope.Thread) +public static class HashMapInput { +public HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new HashMap<>(keys.size()); +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class ImmutableMapInput { +scala.collection.immutable.HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new scala.collection.immutable.HashMap<>(); +for (Integer key : keys) { +map = map.updated(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapSnapshotInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +int count = 0; +for (Integer key : keys) { +if (count % 1_000 == 0) { +snapshotRegistry.deleteSnapshotsUpTo(count - 10_000); +snapshotRegistry.createSnapshot(count); +} +map.put(key, String.valueOf(key)); +count++; +} + +Collections.shuffle(keys); +} +} + + @Benchmark public Map testAddEntriesInHashMap() { -HashMap map = new HashMap<>(NUM_ENTRIES); +HashMap map = new HashMap<>(); for (int i = 0; i < NUM_ENTRIES; i++) { int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); map.put(key, String.valueOf(key)); } + +return map; +} + +@Benchmark +public scala.collection.immutable.HashMap testAddEntriesInImmutableMap() { +scala.collection.immutable.HashMap map = new scala.collection.immutable.HashMap<>(); +for (int i = 0; i < NUM_ENTRIES; i++) { +int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); +map = map.updated(key, String.valueOf(key)); +} + return map; } @Benchmark public Map testAddEntriesInTimelineMap() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); -TimelineHashMap map = -new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES); +TimelineHashMap map = new TimelineHashMap<>(snapshotRegistry, 16); for (int i = 0; i < NUM_ENTRIES; i++) { int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sridhav commented on pull request #10337: KAFKA-12380: Executor in Connect's Worker is not shut down when the worker is
sridhav commented on pull request #10337: URL: https://github.com/apache/kafka/pull/10337#issuecomment-800746746 @rhauch can you please review? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio commented on pull request #10324: URL: https://github.com/apache/kafka/pull/10324#issuecomment-800745760 Test result after fixing the benchmark tests: ``` Benchmark(mapType) (size) Mode CntScoreError Units TimelineHashMapBenchmark.testAddEntries HASH_MAP 100 avgt 10 184.183 ± 12.318 ms/op TimelineHashMapBenchmark.testAddEntries SCALA_HASH_MAP 100 avgt 10 350.935 ± 4.801 ms/op TimelineHashMapBenchmark.testAddEntries TIMELINE_MAP 100 avgt 10 340.839 ± 15.397 ms/op TimelineHashMapBenchmark.testAddEntries TIMELINE_SNAPSHOT_MAP 100 avgt 10 332.535 ± 36.350 ms/op TimelineHashMapBenchmark.testGetEntries HASH_MAP 100 avgt 10 37.772 ± 4.717 ms/op TimelineHashMapBenchmark.testGetEntries SCALA_HASH_MAP 100 avgt 10 248.350 ± 4.445 ms/op TimelineHashMapBenchmark.testGetEntries TIMELINE_MAP 100 avgt 10 83.487 ± 6.952 ms/op TimelineHashMapBenchmark.testIterateEntries HASH_MAP 100 avgt 10 42.743 ± 1.184 ms/op TimelineHashMapBenchmark.testIterateEntries SCALA_HASH_MAP 100 avgt 10 36.030 ± 0.937 ms/op TimelineHashMapBenchmark.testIterateEntries TIMELINE_MAP 100 avgt 10 54.760 ± 2.866 ms/op TimelineHashMapBenchmark.testRemoveEntriesHASH_MAP 100 avgt 10 26.246 ± 1.141 ms/op TimelineHashMapBenchmark.testRemoveEntries SCALA_HASH_MAP 100 avgt 10 430.861 ± 13.864 ms/op TimelineHashMapBenchmark.testRemoveEntriesTIMELINE_MAP 100 avgt 10 79.832 ± 12.833 ms/op TimelineHashMapBenchmark.testRemoveEntries TIMELINE_SNAPSHOT_MAP 100 avgt 10 185.170 ± 13.464 ms/op TimelineHashMapBenchmark.testUpdateEntriesHASH_MAP 100 avgt 10 84.963 ± 10.411 ms/op TimelineHashMapBenchmark.testUpdateEntries SCALA_HASH_MAP 100 avgt 10 426.490 ± 6.468 ms/op TimelineHashMapBenchmark.testUpdateEntriesTIMELINE_MAP 100 avgt 10 160.341 ± 13.799 ms/op TimelineHashMapBenchmark.testUpdateEntries TIMELINE_SNAPSHOT_MAP 100 avgt 10 300.875 ± 35.965 ms/op JMH benchmarks done ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…
chia7712 commented on pull request #10269: URL: https://github.com/apache/kafka/pull/10269#issuecomment-800745691 > is there any work in progress for a KafkaApis.handleFetchRequest test? I suspect it would be similar but maybe a bit harder than what I did for the LeaderAndIsr version #10071 (trading replicamanager for fetchmanager, etc). This benchmark would be helpful for #9944 as you could probably guess :) this PR is blocked by #9944. This PR (and other related issues) aim to remove all extra collection creation by using auto-generated data. In #9944 we have to create a lot of collections to handle the topic id in fetch request. Hence, I need to rethink the value (and approach) of this PR :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sridhav opened a new pull request #10337: KAFKA-12380: Executor in Connect's Worker is not shut down when the worker is
sridhav opened a new pull request #10337: URL: https://github.com/apache/kafka/pull/10337 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* When the worker is stopped, it does not shutdown this executor. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* The following tests are run: * `./gradlew connect:test` * `./gradlew connect:unitTest` * `./gradlew connect:integrationTest` ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10292: MINOR: fix client_compatibility_features_test.py
chia7712 commented on pull request #10292: URL: https://github.com/apache/kafka/pull/10292#issuecomment-800743083 @cmccabe @rondagostino thanks for all your comments (and nice explanation)! I have update code according to reviews. please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10292: MINOR: fix client_compatibility_features_test.py
chia7712 commented on a change in pull request #10292: URL: https://github.com/apache/kafka/pull/10292#discussion_r595666144 ## File path: tests/kafkatest/tests/client/client_compatibility_features_test.py ## @@ -81,8 +81,12 @@ def __init__(self, test_context): self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics=self.topics) def invoke_compatibility_program(self, features): -# Run the compatibility test on the first Kafka node. -node = self.kafka.nodes[0] +if self.zk: +# kafka nodes are set to older version so resolved script path is linked to older assembly. +# run the compatibility test on the first zk node to get script path linked to latest(dev) assembly. +node = self.zk.nodes[0] +else: +node = self.kafka.nodes[0] Review comment: copy that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 commented on pull request #10332: KAFKA-10697: Remove ProduceResponse.responses
tang7526 commented on pull request #10332: URL: https://github.com/apache/kafka/pull/10332#issuecomment-800730712 @chia7712 Could you help review this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981 ] Chris Egerton edited comment on KAFKA-12463 at 3/17/21, 1:34 AM: - Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's true if the second rolling restart includes a change to the partition assignment strategy for its consumers to only use the {{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, yes. Given that and the recently-discovered KAFKA-12487 , I don't even think we're at the point of updating the description yet to mention an upgrade process that would accommodate the {{CooperativeStickyAssignor}} without having a warning notice preceding any hypothetical designs we might implement once both of these pain points are addressed. I've updated the description accordingly, feel free to make any edits as long as we don't actually instruct users how to configure their workers with the {{CooperativeStickyAssignor}} as that will lead to bad worker behavior. {quote}In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. {quote} Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed but it turns out that the overwhelming majority of connectors out there are unaffected just by quirk of how people tend to implement {{Connector::taskConfigs}}. The only cases I've been able to find where this bug comes up are in the file stream connectors. If we believe this is likely to affect other connectors, I personally think we should be addressing that bug instead of working around it or documenting it as a potential gotchas. {quote}If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. {quote} That's a fair point, and it applies to any change of partition assignment strategy and not just specifically moving from an eager to a cooperative one. This becomes especially likely if a task isn't able to respond to a shutdown request within the graceful shutdown period (which defaults to five seconds). The workaround here is to enable both partition assignment strategies for the consumer with a preference for the desired strategy; that way, the desired strategy will take effect as soon as every consumer in the group has been updated, and nobody will break beforehand. I'll update the workaround section in the description to include that info. I'd also just like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. And there aren't any technical limitations preventing us from choosing a non-cooperative assignor right now and running with it, so if worst comes to worst, we might consider switching to, e.g., the {{RoundRobinAssignor}} and calling that good enough. was (Author: chrisegerton): Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's
[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981 ] Chris Egerton edited comment on KAFKA-12463 at 3/17/21, 1:32 AM: - Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's true if the second rolling restart includes a change to the partition assignment strategy for its consumers to only use the {{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, yes. Given that and the recently-discovered KAFKA-12487 , I don't even think we're at the point of updating the description yet to mention an upgrade process that would accommodate the {{CooperativeStickyAssignor}} without having a warning notice preceding any hypothetical designs we might implement once both of these pain points are addressed. I've updated the description accordingly, feel free to make any edits as long as we don't actually instruct users how to configure their workers with the {{CooperativeStickyAssignor}} as that will lead to bad worker behavior. {quote}In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. {quote} Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed but it turns out that the overwhelming majority of connectors out there are unaffected just by quirk of how people tend to implement {{Connector::taskConfigs}}. The only cases I've been able to find where this bug comes up are in the file stream connectors. If we believe this is likely to affect other connectors, I personally think we should be addressing that bug instead of working around it or documenting it as a potential gotchas. {quote}If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. {quote} That's a fair point, and it applies to any change of partition assignment strategy and not just specifically moving from an eager to a cooperative one. This becomes especially likely if a task isn't able to respond to a shutdown request within the graceful shutdown period (which defaults to five seconds). The workaround here is to enable both partition assignment strategies for the consumer with a preference for the desired strategy; that way, the desired strategy will take effect as soon as every consumer in the group has been updated, and nobody will break beforehand. I'll update the workaround section in the description to include that info. I'd also just like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. was (Author: chrisegerton): Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's true if the second rolling restart includes a change to the partition assignment strategy for its consumers to only use the {{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, yes. Given that and the recently-discovered
[GitHub] [kafka] jsancio commented on a change in pull request #10334: MINOR: Fix BaseHashTable sizing
jsancio commented on a change in pull request #10334: URL: https://github.com/apache/kafka/pull/10334#discussion_r595640017 ## File path: metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java ## @@ -56,12 +58,30 @@ this.elements = new Object[expectedSizeToCapacity(expectedSize)]; } +/** + * Calculate the capacity we should provision, given the expected size. + * + * Our capacity must always be a power of 2, and never less than 2. + */ static int expectedSizeToCapacity(int expectedSize) { -if (expectedSize <= 1) { -return 2; +if (expectedSize >= MAX_CAPACITY / 2) { +return MAX_CAPACITY; +} +return Math.max(MIN_CAPACITY, roundUpToPowerOfTwo(expectedSize * 2)); +} + +private static int roundUpToPowerOfTwo(int i) { +if (i < 0) { +return 0; } -double sizeToFit = expectedSize / MAX_LOAD_FACTOR; -return (int) Math.min(MAX_CAPACITY, Math.ceil(Math.log(sizeToFit) / LN_2)); +i = i - 1; +i |= i >> 1; +i |= i >> 2; +i |= i >> 4; +i |= i >> 8; +i |= i >> 16; +i = i + 1; +return i < 0 ? MAX_CAPACITY : i; Review comment: Or https://github.com/google/guava/blob/master/guava/src/com/google/common/math/IntMath.java#L56-L72 It is safe to look as it is Apache License 2.0. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10292: MINOR: fix client_compatibility_features_test.py
rondagostino commented on a change in pull request #10292: URL: https://github.com/apache/kafka/pull/10292#discussion_r595639780 ## File path: tests/kafkatest/tests/client/client_compatibility_features_test.py ## @@ -81,8 +81,12 @@ def __init__(self, test_context): self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics=self.topics) def invoke_compatibility_program(self, features): -# Run the compatibility test on the first Kafka node. -node = self.kafka.nodes[0] +if self.zk: +# kafka nodes are set to older version so resolved script path is linked to older assembly. +# run the compatibility test on the first zk node to get script path linked to latest(dev) assembly. +node = self.zk.nodes[0] +else: +node = self.kafka.nodes[0] Review comment: We can use the dev version of the tool on the Kafka node via code like this: ``` node = self.kafka.nodes[0] cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest " "--bootstrap-server %s " "--num-cluster-nodes %d " "--topic %s " % (self.dev_script_path, self.kafka.bootstrap_servers(), len(self.kafka.nodes), list(self.topics.keys())[0])) ``` And then further down we can define the DEV script path like this: ``` # Always use the latest version of org.apache.kafka.tools.ClientCompatibilityTest # so store away the path to the DEV version before we set the Kafka version self.dev_script_path = self.kafka.path.script("kafka-run-class.sh", self.kafka.nodes[0]) self.kafka.set_version(KafkaVersion(broker_version)) ``` I tested this locally and it solves the problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595633992 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; Review comment: Oh, and on the question of why inflight requests has a list, it was done for future-proofing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595633668 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; Review comment: Notice that we set `maxInFlightRequestsPerConnection` to 1 when constructing the `NetworkClient`. We don't support sending multiple requests to a single node on a single connection in `AdminClient`. I think we could add this support, but we'd have to check how the server handled it since we've never done it before. Maybe there should be a JIRA. Also, if we do choose to add this support for multiple outstanding requests per node per socket we'd need some way to distinguish between "waiting for a chance to use this connection" from "waiting for this connection to be opened" in NetworkClient. Currently ready just returns a boolean, which isn't enough information to distinguish these two cases. We could probably add a new method that returned an enum or something. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595632280 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; +} if (!client.ready(node, now)) { +Long deadline = nodeReadyDeadlines.get(node); +if (deadline != null) { +if (now >= deadline) { +log.info("Disconnecting from {} and revoking {} node assignment(s) " + +"because the node is taking too long to become ready.", +node.idString(), calls.size()); +transitionToPendingAndClearList(calls); +client.disconnect(node.idString()); +nodeReadyDeadlines.remove(node); +iter.remove(); +continue; +} +pollTimeout = Math.min(pollTimeout, deadline - now); +} else { +nodeReadyDeadlines.put(node, now + requestTimeoutMs); +} long nodeTimeout = client.pollDelayMs(node, now); pollTimeout = Math.min(pollTimeout, nodeTimeout); log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout); continue; } -Call call = calls.remove(0); -int requestTimeoutMs = Math.min(KafkaAdminClient.this.requestTimeoutMs, +int remainingRequestTime; +Long deadlineMs = nodeReadyDeadlines.remove(node); Review comment: I added a comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595631870 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; +} if (!client.ready(node, now)) { +Long deadline = nodeReadyDeadlines.get(node); +if (deadline != null) { +if (now >= deadline) { +log.info("Disconnecting from {} and revoking {} node assignment(s) " + +"because the node is taking too long to become ready.", +node.idString(), calls.size()); +transitionToPendingAndClearList(calls); +client.disconnect(node.idString()); +nodeReadyDeadlines.remove(node); +iter.remove(); +continue; +} +pollTimeout = Math.min(pollTimeout, deadline - now); +} else { +nodeReadyDeadlines.put(node, now + requestTimeoutMs); Review comment: The complexity of the min / max issue is one thing. Another thing is that we don't know when the connection has been established, and when it has not. NetworkClient doesn't expose this information. `NetworkClient#ready` may return false for a variety of reasons, many of which are not indicative of connection establishment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595631316 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1136,17 +1185,12 @@ private void timeoutCallsInFlight(TimeoutProcessor processor) { // only one we need to check the timeout for. Call call = contexts.get(0); if (processor.callHasExpired(call)) { -if (call.aborted) { -log.warn("Aborted call {} is still in callsInFlight.", call); -} else { -log.debug("Closing connection to {} due to timeout while awaiting {}", nodeId, call); -call.aborted = true; -client.disconnect(nodeId); -numTimedOut++; -// We don't remove anything from the callsInFlight data structure. Because the connection -// has been closed, the calls should be returned by the next client#poll(), -// and handled at that point. -} +log.debug("Disconnecting from {} due to timeout while awaiting {}", nodeId, call); Review comment: OK, let's raise it to `INFO`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12463: -- Description: Kafka consumers have a pluggable [partition assignment interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] that comes with several out-of-the-box implementations including the [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], and [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. If no partition assignor is configured with a consumer, the {{RangeAssignor}} is used by default. Although there are some benefits to this assignor including stability of assignment across generations and simplicity of design, it comes with a major drawback: the number of active consumers in a group is limited to the number of partitions in the topic(s) with the most partitions. For an example of the worst case, in a consumer group where every member is subscribed to ten topics that each have one partition, only one member of that group will be assigned any topic partitions. This can end up producing counterintuitive and even frustrating behavior when a sink connector is brought up with N tasks to read from some collection of topics with a total of N topic partitions, but some tasks end up idling and not processing any data. h3. Proposed Change *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will not work as consumers will still perform eager rebalancing as long as at least one of the partition assignors they are configured with does not support cooperative rebalancing. KAFKA-12487 should also be addressed before configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any sink connectors.* [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable assignment across generations wherever possible, provide the most even assignment possible (taking into account possible differences in subscriptions across consumers in the group), and allow consumers to continue processing data during rebalance. The documentation for the assignor states that "Users should prefer this assignor for newer clusters." We should alter the default consumer configuration for sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this in a backwards-compatible fashion that also enables rolling upgrades, this should be implemented by changing the {{Worker}} to set the following on the consumer configuration created for each sink connector task: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor {code} This way, consumer groups for sink connectors on Connect clusters in the process of being upgraded will continue to use the {{RangeAssignor}} until all workers in the cluster have been upgraded, and then will switch over to the new {{CooperativeStickyAssignor}} automatically. But, this setting will be overwritten by any user-specified {{consumer.partition.assignment.strategy}} property in the worker configuration, and by any user-specified {{consumer.override.partition.assignment.strategy}} property in a sink connector configuration when per-connector client overrides is enabled in the worker config with {{connector.client.config.override.policy=ALL}}. This improvement is viable as far back as -2.3- 2.4, when the {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug fix, should only be applied to the Connect framework in an upcoming minor release. h3. Manually setting the partition assignment strategy There is a simple workaround to achieve the same behavior in AK releases 2.4 and later that don't also include this improvement: either set a value for the {{consumer.partition.assignment.strategy}} property in the *worker configuration, or* set a value for the {{consumer.override.partition.assignment.strategy}} property in one or more *connector configurations* when per-connector client overrides is enabled in the worker config with {{connector.client.config.override.policy=ALL}}. In order to avoid task failures while the connector is being reconfigured, it is highly recommended that the consumer be configured with a list of both the new and the current partition assignment strategies, instead of just the new partition assignment strategy. For example, to update a
[jira] [Comment Edited] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
[ https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980 ] A. Sophie Blee-Goldman edited comment on KAFKA-12485 at 3/17/21, 12:04 AM: --- Side note: if we get this optimization in then we should be able to skip the last(\?) followup for KIP-572: handling TimeoutExceptions thrown from the mainConsumer#committed call when reinitializing offsets for corrupted tasks in TaskManager#closeAndRevive. cc [~mjsax] was (Author: ableegoldman): Side note: if we get this optimization in then we should be able to skip the last (\?) followup for KIP-572: handling TimeoutExceptions thrown from the mainConsumer#committed call when reinitializing offsets for corrupted tasks in TaskManager#closeAndRevive. cc [~mjsax] > Speed up Consumer#committed by returning cached offsets for owned partitions > > > Key: KAFKA-12485 > URL: https://issues.apache.org/jira/browse/KAFKA-12485 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie++ > > All of the KafkaConsumer#committed APIs will currently make a remote blocking > call to the server to fetch the committed offsets. This is typically used to > reset the offsets after a crash or restart, or to fetch offsets for other > consumers in the group. However some users may wish to invoke this API on > partitions which are currently owned by the Consumer, in which case the > remote call is unnecessary since it should be able to just keep track of what > it has itself committed. > We should consider optimizing these APIs to just return the cached offsets in > place of the remote call when passed in only partitions that are currently > owned. This is similar to what we do in Consumer#position, although there we > have a guarantee that the partitions are owned by the Consumer whereas in > #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
[ https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980 ] A. Sophie Blee-Goldman edited comment on KAFKA-12485 at 3/17/21, 12:03 AM: --- Side note: if we get this optimization in then we should be able to skip the last (\?) followup for KIP-572: handling TimeoutExceptions thrown from the mainConsumer#committed call when reinitializing offsets for corrupted tasks in TaskManager#closeAndRevive. cc [~mjsax] was (Author: ableegoldman): Side note: if we get this optimization in then we should be able to skip the last (?) followup for KIP-572: handling TimeoutExceptions thrown from the mainConsumer#committed call when reinitializing offsets for corrupted tasks in TaskManager#closeAndRevive. cc [~mjsax] > Speed up Consumer#committed by returning cached offsets for owned partitions > > > Key: KAFKA-12485 > URL: https://issues.apache.org/jira/browse/KAFKA-12485 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie++ > > All of the KafkaConsumer#committed APIs will currently make a remote blocking > call to the server to fetch the committed offsets. This is typically used to > reset the offsets after a crash or restart, or to fetch offsets for other > consumers in the group. However some users may wish to invoke this API on > partitions which are currently owned by the Consumer, in which case the > remote call is unnecessary since it should be able to just keep track of what > it has itself committed. > We should consider optimizing these APIs to just return the cached offsets in > place of the remote call when passed in only partitions that are currently > owned. This is similar to what we do in Consumer#position, although there we > have a guarantee that the partitions are owned by the Consumer whereas in > #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981 ] Chris Egerton edited comment on KAFKA-12463 at 3/17/21, 12:03 AM: -- Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's true if the second rolling restart includes a change to the partition assignment strategy for its consumers to only use the {{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, yes. Given that and the recently-discovered KAFKA-12487 , I don't even think we're at the point of updating the description yet to mention an upgrade process that would accommodate the {{CooperativeStickyAssignor}} without having a warning notice preceding any hypothetical designs we might implement once both of these pain points are addressed. I've updated the description accordingly, feel free to make any edits as long as we don't actually instruct users how to configure their workers with the {{CooperativeStickyAssignor}} as that will lead to bad worker behavior. {quote}In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. {quote} Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed but it turns out that the overwhelming majority of connectors out there are unaffected by just by quirk of how people tend to implement {{Connector::taskConfigs}}. The only cases I've been able to find where this bug comes up are in the file stream connectors. If we believe this is likely to affect other connectors, I personally think we should be addressing that bug instead of working around it or documenting it as a potential gotchas. {quote}If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. {quote} That's a fair point, and it applies to any change of partition assignment strategy and not just specifically moving from an eager to a cooperative one. This becomes especially likely if a task isn't able to respond to a shutdown request within the graceful shutdown period (which defaults to five seconds). The workaround here is to enable both partition assignment strategies for the consumer with a preference for the desired strategy; that way, the desired strategy will take effect as soon as every consumer in the group has been updated, and nobody will break beforehand. I'll update the workaround section in the description to include that info. I'd also just like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. was (Author: chrisegerton): Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's true if the second rolling restart includes a change to the partition assignment strategy for its consumers to only use the {{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, yes. Given that and the recently-discovered
[jira] [Comment Edited] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
[ https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980 ] A. Sophie Blee-Goldman edited comment on KAFKA-12485 at 3/17/21, 12:03 AM: --- Side note: if we get this optimization in then we should be able to skip the last (?) followup for KIP-572: handling TimeoutExceptions thrown from the mainConsumer#committed call when reinitializing offsets for corrupted tasks in TaskManager#closeAndRevive. cc [~mjsax] was (Author: ableegoldman): Side note: if we get this optimization in then we should be able to skip the last(?) followup for KIP-572: handling TimeoutExceptions thrown from the mainConsumer#committed call when reinitializing offsets for corrupted tasks in TaskManager#closeAndRevive. cc [~mjsax] > Speed up Consumer#committed by returning cached offsets for owned partitions > > > Key: KAFKA-12485 > URL: https://issues.apache.org/jira/browse/KAFKA-12485 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie++ > > All of the KafkaConsumer#committed APIs will currently make a remote blocking > call to the server to fetch the committed offsets. This is typically used to > reset the offsets after a crash or restart, or to fetch offsets for other > consumers in the group. However some users may wish to invoke this API on > partitions which are currently owned by the Consumer, in which case the > remote call is unnecessary since it should be able to just keep track of what > it has itself committed. > We should consider optimizing these APIs to just return the cached offsets in > place of the remote call when passed in only partitions that are currently > owned. This is similar to what we do in Consumer#position, although there we > have a guarantee that the partitions are owned by the Consumer whereas in > #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981 ] Chris Egerton commented on KAFKA-12463: --- Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's true if the second rolling restart includes a change to the partition assignment strategy for its consumers to only use the {{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, yes. Given that and the recently-discovered KAFKA-12487 , I don't even think we're at the point of updating the description yet to mention an upgrade process that would accommodate the {{CooperativeStickyAssignor}} without having a warning notice preceding any hypothetical designs we might implement once both of these pain points are addressed. I've updated the description accordingly, feel free to make any edits as long as we don't actually instruct users how to configure their workers with the {{CooperativeStickyAssignor}} as that will lead to bad worker behavior. {quote}In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. {quote} Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed but it turns out that the overwhelming majority of connectors out there are unaffected by just by quirk of how people tend to implement {{Connector::taskConfigs}}. The only cases I've been able to find where this bug comes up are in the file stream connectors. If we believe this is likely to affect other connectors, I personally think we should be addressing that bug instead of working around it or documenting it as a potential gotchas. {quote}If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. {quote} That's a fair point, and it applies to any change of partition assignment strategy and not just specifically moving from an eager to a cooperative one. This becomes especially likely if a task isn't able to respond to a shutdown request within the graceful shutdown period (which defaults to five seconds). The workaround here is to enable both partition assignment strategies for the consumer with a preference for the desired strategy; that way, the desired strategy will take effect as soon as every consumer in the group has been updated, and nobody will break beforehand. I'll update the workaround section in the description to include that info. I'd like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and >
[jira] [Commented] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
[ https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980 ] A. Sophie Blee-Goldman commented on KAFKA-12485: Side note: if we get this optimization in then we should be able to skip the last(?) followup for KIP-572: handling TimeoutExceptions thrown from the mainConsumer#committed call when reinitializing offsets for corrupted tasks in TaskManager#closeAndRevive. cc [~mjsax] > Speed up Consumer#committed by returning cached offsets for owned partitions > > > Key: KAFKA-12485 > URL: https://issues.apache.org/jira/browse/KAFKA-12485 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie++ > > All of the KafkaConsumer#committed APIs will currently make a remote blocking > call to the server to fetch the committed offsets. This is typically used to > reset the offsets after a crash or restart, or to fetch offsets for other > consumers in the group. However some users may wish to invoke this API on > partitions which are currently owned by the Consumer, in which case the > remote call is unnecessary since it should be able to just keep track of what > it has itself committed. > We should consider optimizing these APIs to just return the cached offsets in > place of the remote call when passed in only partitions that are currently > owned. This is similar to what we do in Consumer#position, although there we > have a guarantee that the partitions are owned by the Consumer whereas in > #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
[ https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12485: --- Description: All of the KafkaConsumer#committed APIs will currently make a remote blocking call to the server to fetch the committed offsets. This is typically used to reset the offsets after a crash or restart, or to fetch offsets for other consumers in the group. However some users may wish to invoke this API on partitions which are currently owned by the Consumer, in which case the remote call is unnecessary since it should be able to just keep track of what it has itself committed. We should consider optimizing these APIs to just return the cached offsets in place of the remote call when passed in only partitions that are currently owned. This is similar to what we do in Consumer#position, although there we have a guarantee that the partitions are owned by the Consumer whereas in #committed we do not was: All of the KafkaConsumer#committed APIs will currently make a remote blocking call to the server to fetch the committed offsets. This is typically used to reset the offsets after a crash or restart, or to fetch offsets for other consumers in the group. However some users may wish to invoke this API on partitions which are currently owned by the Consumer, in which case the remote call is unnecessary since those offsets should already be known. We should consider optimizing these APIs to just return the cached offsets in place of the remote call when passed in only partitions that are currently owned. This is similar to what we do in Consumer#position, although there we have a guarantee that the partitions are owned by the Consumer whereas in #committed we do not > Speed up Consumer#committed by returning cached offsets for owned partitions > > > Key: KAFKA-12485 > URL: https://issues.apache.org/jira/browse/KAFKA-12485 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie++ > > All of the KafkaConsumer#committed APIs will currently make a remote blocking > call to the server to fetch the committed offsets. This is typically used to > reset the offsets after a crash or restart, or to fetch offsets for other > consumers in the group. However some users may wish to invoke this API on > partitions which are currently owned by the Consumer, in which case the > remote call is unnecessary since it should be able to just keep track of what > it has itself committed. > We should consider optimizing these APIs to just return the cached offsets in > place of the remote call when passed in only partitions that are currently > owned. This is similar to what we do in Consumer#position, although there we > have a guarantee that the partitions are owned by the Consumer whereas in > #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12463: -- Description: Kafka consumers have a pluggable [partition assignment interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] that comes with several out-of-the-box implementations including the [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], and [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. If no partition assignor is configured with a consumer, the {{RangeAssignor}} is used by default. Although there are some benefits to this assignor including stability of assignment across generations and simplicity of design, it comes with a major drawback: the number of active consumers in a group is limited to the number of partitions in the topic(s) with the most partitions. For an example of the worst case, in a consumer group where every member is subscribed to ten topics that each have one partition, only one member of that group will be assigned any topic partitions. This can end up producing counterintuitive and even frustrating behavior when a sink connector is brought up with N tasks to read from some collection of topics with a total of N topic partitions, but some tasks end up idling and not processing any data. h3. Proposed Change *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will not work as consumers will still perform eager rebalancing as long as at least one of the partition assignors they are configured with does not support cooperative rebalancing. KAFKA-12487 should also be addressed before configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any sink connectors.* [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable assignment across generations wherever possible, provide the most even assignment possible (taking into account possible differences in subscriptions across consumers in the group), and allow consumers to continue processing data during rebalance. The documentation for the assignor states that "Users should prefer this assignor for newer clusters." We should alter the default consumer configuration for sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this in a backwards-compatible fashion that also enables rolling upgrades, this should be implemented by changing the {{Worker}} to set the following on the consumer configuration created for each sink connector task: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor {code} This way, consumer groups for sink connectors on Connect clusters in the process of being upgraded will continue to use the {{RangeAssignor}} until all workers in the cluster have been upgraded, and then will switch over to the new {{CooperativeStickyAssignor}} automatically. But, this setting will be overwritten by any user-specified {{consumer.partition.assignment.strategy}} property in the worker configuration, and by any user-specified {{consumer.override.partition.assignment.strategy}} property in a sink connector configuration when per-connector client overrides is enabled in the worker config with {{connector.client.config.override.policy=ALL}}. This improvement is viable as far back as -2.3- 2.4, when the {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug fix, should only be applied to the Connect framework in an upcoming minor release. h3. Manually setting the partition assignment strategy There is a simple workaround to achieve the same behavior in AK releases 2.4 and later that don't also include this improvement: either set a value for the {{consumer.partition.assignment.strategy}} property in the *worker configuration, or* set a value for the {{consumer.override.partition.assignment.strategy}} property in one or more *connector configurations* when per-connector client overrides is enabled in the worker config with {{connector.client.config.override.policy=ALL}}. was: Kafka consumers have a pluggable [partition assignment interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] that comes with several out-of-the-box implementations including the
[GitHub] [kafka] rondagostino commented on pull request #10292: MINOR: fix client_compatibility_features_test.py
rondagostino commented on pull request #10292: URL: https://github.com/apache/kafka/pull/10292#issuecomment-800685338 > Can we just use a separate node for the client, unconditionally We don't have a `Service` for it, and we currently invoke it in a node that we allocate for the test. With no ZooKeeper all we have is Kafka (version >= 2.8, of course). I could see a situation where we add functionality to `org.apache.kafka.tools.ClientCompatibilityTest` in some future version and then we try to run that against Kafka v2.8 -- and then it'll go BOOM! as you suggest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12463: -- Description: Kafka consumers have a pluggable [partition assignment interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] that comes with several out-of-the-box implementations including the [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], and [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. If no partition assignor is configured with a consumer, the {{RangeAssignor}} is used by default. Although there are some benefits to this assignor including stability of assignment across generations and simplicity of design, it comes with a major drawback: the number of active consumers in a group is limited to the number of partitions in the topic(s) with the most partitions. For an example of the worst case, in a consumer group where every member is subscribed to ten topics that each have one partition, only one member of that group will be assigned any topic partitions. This can end up producing counterintuitive and even frustrating behavior when a sink connector is brought up with N tasks to read from some collection of topics with a total of N topic partitions, but some tasks end up idling and not processing any data. h3. Proposed Change *NOTE: Until/unless* [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable assignment across generations wherever possible, provide the most even assignment possible (taking into account possible differences in subscriptions across consumers in the group), and allow consumers to continue processing data during rebalance. The documentation for the assignor states that "Users should prefer this assignor for newer clusters." We should alter the default consumer configuration for sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this in a backwards-compatible fashion that also enables rolling upgrades, this should be implemented by changing the {{Worker}} to set the following on the consumer configuration created for each sink connector task: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor {code} This way, consumer groups for sink connectors on Connect clusters in the process of being upgraded will continue to use the {{RangeAssignor}} until all workers in the cluster have been upgraded, and then will switch over to the new {{CooperativeStickyAssignor}} automatically. But, this setting will be overwritten by any user-specified {{consumer.partition.assignment.strategy}} property in the worker configuration, and by any user-specified {{consumer.override.partition.assignment.strategy}} property in a sink connector configuration when per-connector client overrides is enabled in the worker config with {{connector.client.config.override.policy=ALL}}. This improvement is viable as far back as -2.3- 2.4, when the {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug fix, should only be applied to the Connect framework in an upcoming minor release. h3. Manually setting the partition assignment strategy There is a simple workaround to achieve the same behavior in AK releases 2.4 and later that don't also include this fix: either set the following in the *worker configuration*: {code:java} consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor {code} or set the following in *connector configurations* when per-connector client overrides is enabled in the worker config with {{connector.client.config.override.policy=ALL}}: {code:java} consumer.override.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor {code} was: Kafka consumers have a pluggable [partition assignment interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] that comes with several out-of-the-box implementations including the [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
[GitHub] [kafka] rondagostino commented on pull request #10292: MINOR: fix client_compatibility_features_test.py
rondagostino commented on pull request #10292: URL: https://github.com/apache/kafka/pull/10292#issuecomment-800679385 > use the devel version of the tool when ZK is enabled, and the old version when it's not The code as amended in this PR will use the devel version when ZooKeeper is enabled (which is the way it always was except after I changed/broke it) and then, when Zookeeper is not enabled -- which is a new case -- it will use the Kafka version, which by definition will be >= 2.8. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12473) Make the CooperativeStickyAssignor the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302970#comment-17302970 ] Chris Egerton commented on KAFKA-12473: --- This doesn't necessarily block the change proposed here, but we should take care to make sure that https://issues.apache.org/jira/browse/KAFKA-12487 doesn't cause sink connectors to suddenly begin failing on Connect workers after an upgrade to 3.0. It'd be fantastic if we could patch it in time for 3.0, but if not, we'll have to hard-code Connect to use a different, non-cooperative partition assignor by default. > Make the CooperativeStickyAssignor the default assignor > --- > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > Note that this will require users to follow the [upgrade path laid out in > KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429:+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer] > to safely perform a rolling upgrade. When we change the default assignor we > need to make sure this is clearly documented in the upgrade guide for 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino edited a comment on pull request #10297: MINOR: fix failing ZooKeeper system tests
rondagostino edited a comment on pull request #10297: URL: https://github.com/apache/kafka/pull/10297#issuecomment-800676314 @cmccabe All set. I added the comment and also opened https://issues.apache.org/jira/browse/KAFKA-12488 for the refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10297: MINOR: fix failing ZooKeeper system tests
rondagostino commented on pull request #10297: URL: https://github.com/apache/kafka/pull/10297#issuecomment-800676314 @cmccabe Al set. I added the comment and also opened https://issues.apache.org/jira/browse/KAFKA-12488 for the refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12488) Be more specific about enabled SASL mechnanisms in system tests
Ron Dagostino created KAFKA-12488: - Summary: Be more specific about enabled SASL mechnanisms in system tests Key: KAFKA-12488 URL: https://issues.apache.org/jira/browse/KAFKA-12488 Project: Kafka Issue Type: Improvement Components: system tests Reporter: Ron Dagostino The `SecurityConfig.enabled_sasl_mechanisms()` method simply returns all SASL mechanisms that are enabled for the test -- whether for brokers, clients, controllers, or Zookeeper. These enabled mechanisms are used in JAAS config files to determine what appears in those config files. For example, the entire list of enabled mechanisms is used in both KafkaClient{} and KafkaServer{} sections, but that's way too broad. We should be more precise about what mechanisms we are interested in for the different sections of these JAAS config files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12487) Sink connectors do not work with the cooperative consumer rebalance protocol
Chris Egerton created KAFKA-12487: - Summary: Sink connectors do not work with the cooperative consumer rebalance protocol Key: KAFKA-12487 URL: https://issues.apache.org/jira/browse/KAFKA-12487 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.0.0, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 Reporter: Chris Egerton Assignee: Chris Egerton The {{ConsumerRebalanceListener}} used by the framework to respond to rebalance events in consumer groups for sink tasks is hard-coded with the assumption that the consumer performs rebalances eagerly. In other words, it assumes that whenever {{onPartitionsRevoked}} is called, all partitions have been revoked from that consumer, and whenever {{onPartitionsAssigned}} is called, the partitions passed in to that method comprise the complete set of topic partitions assigned to that consumer. See the [WorkerSinkTask.HandleRebalance class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730] for the specifics. One issue this can cause is silently ignoring to-be-committed offsets provided by sink tasks, since the framework ignores offsets provided by tasks in their {{preCommit}} method if it does not believe that the consumer for that task is currently assigned the topic partition for that offset. See these lines in the [WorkerSinkTask::commitOffsets method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430] for reference. This may not be the only issue caused by configuring a sink connector's consumer to use cooperative rebalancing. Rigorous unit and integration testing should be added before claiming that the Connect framework supports the use of cooperative consumers with sink connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10292: MINOR: fix client_compatibility_features_test.py
cmccabe commented on pull request #10292: URL: https://github.com/apache/kafka/pull/10292#issuecomment-800670554 @rondagostino : thanks for the explanation. It's a bit weird to use the devel version of the tool when ZK is enabled, and the old version when it's not. I think this will lead to some odd issues down the road. @chia7712 : Can we just use a separate node for the client, unconditionally? Then that separate node can always have the devel software. Or, I suppose, we could change the path that the compat tool is invoked via (maybe that's annoying, though...) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10297: MINOR: fix failing ZooKeeper system tests
cmccabe commented on pull request #10297: URL: https://github.com/apache/kafka/pull/10297#issuecomment-800668936 Sure, we can refactor this later if it's that easier. Can you add a comment to `SecurityConfig#enabled_sasl_mechanisms` describing what it's supposed to return, though? If it should return every possible sasl mechanism in use (zk, controller, broker, client) then let's document that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12482) Remove deprecated Connect worker configs
[ https://issues.apache.org/jira/browse/KAFKA-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302957#comment-17302957 ] Randall Hauch commented on KAFKA-12482: --- We've not yet decided whether it's worth it to remove these. See [Changing Defaults in Connect 3.0.0 wiki page|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047362] and discussion thread. > Remove deprecated Connect worker configs > > > Key: KAFKA-12482 > URL: https://issues.apache.org/jira/browse/KAFKA-12482 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Priority: Critical > Fix For: 3.0.0 > > > The following Connect worker configuration properties were deprecated and > should be removed in 3.0.0: > * {{rest.host.name}} (deprecated in > [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) > * {{rest.port}} (deprecated in > [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) > * {{internal.key.converter}} (deprecated in > [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) > * {{internal.value.converter}} (deprecated in > [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302952#comment-17302952 ] A. Sophie Blee-Goldman commented on KAFKA-12472: By the way, I just filed https://issues.apache.org/jira/browse/KAFKA-12486 which could be relevant as it would add another possible reason for Streams to trigger a rebalance > Add a Consumer / Streams metric to indicate the current rebalance status > > > Key: KAFKA-12472 > URL: https://issues.apache.org/jira/browse/KAFKA-12472 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > Today to trouble shoot a rebalance issue operators need to do a lot of manual > steps: locating the problematic members, search in the log entries, and look > for related metrics. It would be great to add a single metric that covers all > these manual steps and operators would only need to check this single signal > to check what is the root cause. A concrete idea is to expose two enum gauge > metrics on consumer and streams, respectively: > * Consumer level (the order below is by-design, see Streams level for > details): > 0. *None* => there is no rebalance on going. > 1. *CoordinatorRequested* => any of the coordinator response contains a > RebalanceInProgress error code. > 2. *NewMember* => when the join group response has a MemberIdRequired error > code. > 3. *UnknownMember* => when any of the coordinator response contains an > UnknownMember error code, indicating this member is already kicked out of the > group. > 4. *StaleMember* => when any of the coordinator response contains an > IllegalGeneration error code. > 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb > expired. > 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll > API, as well as upon calling the enforceRebalance API. > 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. > 8. *SubscriptionChanged* => requestRejoin triggered since subscription has > changed. > 9. *RetryOnError* => when join/syncGroup response contains a retriable > error which would cause the consumer to backoff and retry. > 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions > is not empty. > The transition rule is that a non-zero status code can only transit to zero > or to a higher code, but not to a lower code (same for streams, see > rationales below). > * Streams level: today a streams client can have multiple consumers. We > introduced some new enum states as well as aggregation rules across > consumers: if there's no streams-layer events as below that transits its > status (i.e. streams layer think it is 0), then we aggregate across all the > embedded consumers and take the largest status code value as the streams > metric; if there are streams-layer events that determines its status should > be in 10+, then it ignores all embedded consumer layer status code since it > should has a higher precedence. In addition, when create aggregated metric > across streams instance (a.k.a at the app-level, which is usually what we > would care and alert on), we also follow the same aggregation rule, e.g. if > there are two streams instance where one instance's status code is 1), and > the other is 10), then the app's status is 10). > 10. *RevocationNeeded* => the definition of this is changed to the original > 10) defined in consumer above, OR leader decides to revoke either > active/standby tasks and hence schedule follow-ups. > 11. *AssignmentProbing* => leader decides to schedule follow-ups since the > current assignment is unstable. > 12. *VersionProbing* => leader decides to schedule follow-ups due to version > probing. > 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to > endpoint updates. > The main motivations of the above proposed precedence order are the following: > 1. When a rebalance is triggered by one member, all other members would only > know it is due to CoordinatorRequested from coordinator error codes, and > hence CoordinatorRequested should be overridden by any other status when > aggregating across clients. > 2. DroppedGroup could cause unknown/stale members that would fail and retry > immediately, and hence should take higher precedence. > 3. Revocation definition is extended in Streams, and hence it needs to take > the highest precedence among all consumer-only status so that it would not be > overridden by any of the consumer-only status. > 4. In general, more rare events get higher precedence. > This is proposed on top of KAFKA-12352. Any comments on the precedence rules > / categorization
[GitHub] [kafka] rondagostino commented on pull request #10297: MINOR: fix failing ZooKeeper system tests
rondagostino commented on pull request #10297: URL: https://github.com/apache/kafka/pull/10297#issuecomment-800653669 Yeah, there never has been a clear delineation between "what SASL mechanism are enabled for Kafka" vs. "what SASL mechanisms are enabled for ZooKeeper". I had to tease this apart for Kafka brokers vs. Kafka Raft controllers (see `serves_raft_sasl` and `uses_raft_sasl`). The same kind of teasing apart could be done for Kafka vs. Zookeeper as well. Perhaps we can open a ticket for this and leave it for another time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…
jolshan commented on pull request #10269: URL: https://github.com/apache/kafka/pull/10269#issuecomment-800648300 @chia7712 is there any work in progress for a KafkaApis.handleFetchRequest test? I suspect it would be similar but maybe a bit harder than what I did for the LeaderAndIsr version https://github.com/apache/kafka/pull/10071 (trading replicamanager for fetchmanager, etc). This benchmark would be helpful for https://github.com/apache/kafka/pull/9944 as you could probably guess :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10297: MINOR: fix failing ZooKeeper system tests
cmccabe commented on pull request #10297: URL: https://github.com/apache/kafka/pull/10297#issuecomment-800646841 I'm struggling a bit to understand whether enabled_sasl_mechanisms is supposed to be for the broker or for ZK? It seems like you're treating it like it's for both, but then in a few other places, like admin_client_as_broker_jaas.conf, it seems to be just broker. Or did I miss something here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10292: MINOR: fix client_compatibility_features_test.py
rondagostino commented on pull request #10292: URL: https://github.com/apache/kafka/pull/10292#issuecomment-800644753 I broke this with https://github.com/apache/kafka/pull/10105/files#diff-84e14a0909d232b70f0a957ded161cd077d4dc1d069bbaab8e1bacc8dd2e0572L84-R85. The test used to always use the ZooKeeper node, and I changed it to use the Kafka node not realizing that the two distribution versions would be different. We want it to always use at least version 2.7, which we can get by using either the ZooKeeper node for the Zookeeper case or the Kafka node for the Raft case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12483) Enable client overrides in connector configs by default
[ https://issues.apache.org/jira/browse/KAFKA-12483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302933#comment-17302933 ] Randall Hauch commented on KAFKA-12483: --- The draft PR implements the changes proposed in [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default]. They should not be merged until the KIP is approved. > Enable client overrides in connector configs by default > --- > > Key: KAFKA-12483 > URL: https://issues.apache.org/jira/browse/KAFKA-12483 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > Connector-specific client overrides were added in > [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy], > but that feature is not enabled by default since it would not have been > backward compatible. > But with AK 3.0.0, we have the opportunity to enable connector client > overrides by default by changing the worker config's > {{connector.client.config.override.policy}} default value to {{All}}. > See > [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12484) Enable Connect's connector log contexts by default
[ https://issues.apache.org/jira/browse/KAFKA-12484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302932#comment-17302932 ] Randall Hauch commented on KAFKA-12484: --- The draft PR implements the changes proposed in [KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration]. They should not be merged until the KIP is approved. > Enable Connect's connector log contexts by default > -- > > Key: KAFKA-12484 > URL: https://issues.apache.org/jira/browse/KAFKA-12484 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > Connect's Log4J configuration does not by default log the connector contexts. > That feature was added in > [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] > and first appeared in AK 2.3.0, but it was not enabled by default since that > would not have been backward compatible. > But with AK 3.0.0, we have the opportunity to change the default in > {{config/connect-log4j.properties}} to enable connector log contexts. > See > [KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch opened a new pull request #10336: KAFKA-12483: Enable client overrides in connector configs by default (KIP-722)
rhauch opened a new pull request #10336: URL: https://github.com/apache/kafka/pull/10336 **DO NOT MERGE until after [KIP-722](https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default) has been approved!** Changes the default value for the `connector.client.config.override.policy` worker configuration property from `None` to `All`. Modified unit tests to verify all policies still work, and that by default connectors can override all client policies. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12486) Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task
[ https://issues.apache.org/jira/browse/KAFKA-12486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12486: --- Priority: Critical (was: Major) > Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task > > > Key: KAFKA-12486 > URL: https://issues.apache.org/jira/browse/KAFKA-12486 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Critical > > In KIP-441, we added the HighAvailabilityTaskAssignor to address certain > common scenarios which tend to lead to heavy downtime for tasks, such as > scaling out. The new assignor will always place an active task on a client > which has a "caught-up" copy of that tasks' state, if any exists, while the > intended recipient will instead get a standby task to warm up the state in > the background. This way we keep tasks live as much as possible, and avoid > the long downtime imposed by state restoration on active tasks. > We can actually expand on this to reduce downtime due to restoring state: > specifically, we may throw a TaskCorruptedException on an active task which > leads to wiping out the state stores of that task and restoring from scratch. > There are a few cases where this may be thrown: > # No checkpoint found with EOS > # TimeoutException when processing a StreamTask > # TimeoutException when committing offsets under eos > # RetriableException in RecordCollectorImpl > (There is also the case of OffsetOutOfRangeException, but that is excluded > here since it only applies to standby tasks). > We should consider triggering a rebalance when we hit TaskCorruptedException > on an active task so that the assignor has the chance to redirect this to > another client who can resume work on the task while the original owner works > on restoring the state from scratch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12486) Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task
A. Sophie Blee-Goldman created KAFKA-12486: -- Summary: Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task Key: KAFKA-12486 URL: https://issues.apache.org/jira/browse/KAFKA-12486 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In KIP-441, we added the HighAvailabilityTaskAssignor to address certain common scenarios which tend to lead to heavy downtime for tasks, such as scaling out. The new assignor will always place an active task on a client which has a "caught-up" copy of that tasks' state, if any exists, while the intended recipient will instead get a standby task to warm up the state in the background. This way we keep tasks live as much as possible, and avoid the long downtime imposed by state restoration on active tasks. We can actually expand on this to reduce downtime due to restoring state: specifically, we may throw a TaskCorruptedException on an active task which leads to wiping out the state stores of that task and restoring from scratch. There are a few cases where this may be thrown: # No checkpoint found with EOS # TimeoutException when processing a StreamTask # TimeoutException when committing offsets under eos # RetriableException in RecordCollectorImpl (There is also the case of OffsetOutOfRangeException, but that is excluded here since it only applies to standby tasks). We should consider triggering a rebalance when we hit TaskCorruptedException on an active task so that the assignor has the chance to redirect this to another client who can resume work on the task while the original owner works on restoring the state from scratch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12483) Enable client overrides in connector configs by default
[ https://issues.apache.org/jira/browse/KAFKA-12483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reassigned KAFKA-12483: - Assignee: Randall Hauch > Enable client overrides in connector configs by default > --- > > Key: KAFKA-12483 > URL: https://issues.apache.org/jira/browse/KAFKA-12483 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > Connector-specific client overrides were added in > [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy], > but that feature is not enabled by default since it would not have been > backward compatible. > But with AK 3.0.0, we have the opportunity to enable connector client > overrides by default by changing the worker config's > {{connector.client.config.override.policy}} default value to {{All}}. > See > [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12483) Enable client overrides in connector configs by default
[ https://issues.apache.org/jira/browse/KAFKA-12483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-12483: -- Description: Connector-specific client overrides were added in [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy], but that feature is not enabled by default since it would not have been backward compatible. But with AK 3.0.0, we have the opportunity to enable connector client overrides by default by changing the worker config's {{connector.client.config.override.policy}} default value to {{All}}. See [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default]. was: Connector-specific client overrides were added in [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy], but that feature is not enabled by default since it would not have been backward compatible. But with AK 3.0.0, we have the opportunity to enable connector client overrides by default by changing the worker config's {{connector.client.config.override.policy}} default value to \{{All}}. > Enable client overrides in connector configs by default > --- > > Key: KAFKA-12483 > URL: https://issues.apache.org/jira/browse/KAFKA-12483 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > Connector-specific client overrides were added in > [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy], > but that feature is not enabled by default since it would not have been > backward compatible. > But with AK 3.0.0, we have the opportunity to enable connector client > overrides by default by changing the worker config's > {{connector.client.config.override.policy}} default value to {{All}}. > See > [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12484) Enable Connect's connector log contexts by default
[ https://issues.apache.org/jira/browse/KAFKA-12484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-12484: -- Description: Connect's Log4J configuration does not by default log the connector contexts. That feature was added in [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] and first appeared in AK 2.3.0, but it was not enabled by default since that would not have been backward compatible. But with AK 3.0.0, we have the opportunity to change the default in {{config/connect-log4j.properties}} to enable connector log contexts. See [KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration]. was: Connect's Log4J configuration does not by default log the connector contexts. That feature was added in [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] and first appeared in AK 2.3.0, but it was not enabled by default since that would not have been backward compatible. But with AK 3.0.0, we have the opportunity to change the default in {{config/connect-log4j.properties}} to enable connector log contexts. > Enable Connect's connector log contexts by default > -- > > Key: KAFKA-12484 > URL: https://issues.apache.org/jira/browse/KAFKA-12484 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > Connect's Log4J configuration does not by default log the connector contexts. > That feature was added in > [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] > and first appeared in AK 2.3.0, but it was not enabled by default since that > would not have been backward compatible. > But with AK 3.0.0, we have the opportunity to change the default in > {{config/connect-log4j.properties}} to enable connector log contexts. > See > [KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch opened a new pull request #10335: KAFKA-12484: Enable Connect's connector log contexts by default (KIP-721) DRAFT
rhauch opened a new pull request #10335: URL: https://github.com/apache/kafka/pull/10335 **DO NOT MERGE until after [KIP-721](https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration) has been approved!** Change the `connect-log4j.properties` file to use the connector log context by default. This feature was previously added in KIP-449, but was not enabled by default. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10334: MINOR: Fix BaseHashTable sizing
ijuma commented on a change in pull request #10334: URL: https://github.com/apache/kafka/pull/10334#discussion_r595558838 ## File path: metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java ## @@ -56,12 +58,30 @@ this.elements = new Object[expectedSizeToCapacity(expectedSize)]; } +/** + * Calculate the capacity we should provision, given the expected size. + * + * Our capacity must always be a power of 2, and never less than 2. + */ static int expectedSizeToCapacity(int expectedSize) { -if (expectedSize <= 1) { -return 2; +if (expectedSize >= MAX_CAPACITY / 2) { +return MAX_CAPACITY; +} +return Math.max(MIN_CAPACITY, roundUpToPowerOfTwo(expectedSize * 2)); +} + +private static int roundUpToPowerOfTwo(int i) { +if (i < 0) { +return 0; } -double sizeToFit = expectedSize / MAX_LOAD_FACTOR; -return (int) Math.min(MAX_CAPACITY, Math.ceil(Math.log(sizeToFit) / LN_2)); +i = i - 1; +i |= i >> 1; +i |= i >> 2; +i |= i >> 4; +i |= i >> 8; +i |= i >> 16; +i = i + 1; +return i < 0 ? MAX_CAPACITY : i; Review comment: Can you do something like: ```java static final int tableSizeFor(int cap) { int n = -1 >>> Integer.numberOfLeadingZeros(cap - 1); return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yatsukav commented on a change in pull request #10333: KAFKA-12481: Add socket.nagle.disable config property
yatsukav commented on a change in pull request #10333: URL: https://github.com/apache/kafka/pull/10333#discussion_r595557313 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -391,6 +392,7 @@ object KafkaConfig { val AdvertisedListenersProp = "advertised.listeners" val ListenerSecurityProtocolMapProp = "listener.security.protocol.map" val ControlPlaneListenerNameProp = "control.plane.listener.name" + val SocketNagleDisableProp = "socket.nagle.disable" Review comment: I take `socket.nagle.disable` from librdkafka project. But ok, I rename it to `socket.tcp.no.delay`. Perhaps this is clearer. Thank you for review. I request KIP creating permission on d...@kafka.apache.org. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yatsukav commented on a change in pull request #10333: KAFKA-12481: Add socket.nagle.disable config property
yatsukav commented on a change in pull request #10333: URL: https://github.com/apache/kafka/pull/10333#discussion_r595557313 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -391,6 +392,7 @@ object KafkaConfig { val AdvertisedListenersProp = "advertised.listeners" val ListenerSecurityProtocolMapProp = "listener.security.protocol.map" val ControlPlaneListenerNameProp = "control.plane.listener.name" + val SocketNagleDisableProp = "socket.nagle.disable" Review comment: I take `socket.nagle.disable` from librdkafka project. But ok, I rename it to `socket.tcp.no.delay`. Perhaps this is clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10334: MINOR: Fix BaseHashTable sizing
cmccabe opened a new pull request #10334: URL: https://github.com/apache/kafka/pull/10334 The array backing BaseHashTable is intended to be sized as a power of two. Due to a bug, the initial array size was calculated incorrectly in some cases. Also make the maximum array size the largest possible 31-bit power of two. Previously it was a smaller size, but this was due to a typo. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302889#comment-17302889 ] A. Sophie Blee-Goldman commented on KAFKA-12472: {quote} illegal generation id is usually due to a new member with the same id replaced an old member and then later the old zombie member tries to talk to coordinator (though with the new model, this should mostly be for static members). {quote} Makes sense – but just wondering, what are you referring to here by "new model" {quote}Re. 3: My reasoning is that if a new member joins the group and triggers the rebalance, then from observability point of view we would like to know it's due to "NewMember", not "CoordinatorRequested", since the latter is actually not the "root" cause. This applies to both consumer and streams. {quote} The motivation here sounds good, as the "CoordinatorRequested" state doesn't really tell you why it's rebalancing – but it does seem to potentially violate the transition rule that "a non-zero status code can only transit to zero or to a higher code". What if you have a new member join and then some other consumer in another client triggers a rebalance, causing the new member to get the RebalanceInProgressException before it's able to complete the first rebalance. Shouldn't it then transition from *NewMember* --> *CoordinatorRequested* ? But this transition is disallowed under this rule. Similarly, it seems like a single StreamThread could go from any of the Streams-specific states to almost any of the consumer states, eg *AssignmentProbing -->* *DroppedGroup*. I guess I'm wondering why do we need this transition rule in the first place? > Add a Consumer / Streams metric to indicate the current rebalance status > > > Key: KAFKA-12472 > URL: https://issues.apache.org/jira/browse/KAFKA-12472 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > Today to trouble shoot a rebalance issue operators need to do a lot of manual > steps: locating the problematic members, search in the log entries, and look > for related metrics. It would be great to add a single metric that covers all > these manual steps and operators would only need to check this single signal > to check what is the root cause. A concrete idea is to expose two enum gauge > metrics on consumer and streams, respectively: > * Consumer level (the order below is by-design, see Streams level for > details): > 0. *None* => there is no rebalance on going. > 1. *CoordinatorRequested* => any of the coordinator response contains a > RebalanceInProgress error code. > 2. *NewMember* => when the join group response has a MemberIdRequired error > code. > 3. *UnknownMember* => when any of the coordinator response contains an > UnknownMember error code, indicating this member is already kicked out of the > group. > 4. *StaleMember* => when any of the coordinator response contains an > IllegalGeneration error code. > 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb > expired. > 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll > API, as well as upon calling the enforceRebalance API. > 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. > 8. *SubscriptionChanged* => requestRejoin triggered since subscription has > changed. > 9. *RetryOnError* => when join/syncGroup response contains a retriable > error which would cause the consumer to backoff and retry. > 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions > is not empty. > The transition rule is that a non-zero status code can only transit to zero > or to a higher code, but not to a lower code (same for streams, see > rationales below). > * Streams level: today a streams client can have multiple consumers. We > introduced some new enum states as well as aggregation rules across > consumers: if there's no streams-layer events as below that transits its > status (i.e. streams layer think it is 0), then we aggregate across all the > embedded consumers and take the largest status code value as the streams > metric; if there are streams-layer events that determines its status should > be in 10+, then it ignores all embedded consumer layer status code since it > should has a higher precedence. In addition, when create aggregated metric > across streams instance (a.k.a at the app-level, which is usually what we > would care and alert on), we also follow the same aggregation rule, e.g. if > there are two streams instance where one instance's status code is 1), and > the other is 10), then the app's status is 10). > 10.
[GitHub] [kafka] ijuma merged pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft
ijuma merged pull request #10322: URL: https://github.com/apache/kafka/pull/10322 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10330: MINOR: Add toString to various Kafka Metrics classes
ijuma merged pull request #10330: URL: https://github.com/apache/kafka/pull/10330 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10330: MINOR: Add toString to various Kafka Metrics classes
ijuma commented on pull request #10330: URL: https://github.com/apache/kafka/pull/10330#issuecomment-800596069 Failures are unrelated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12484) Enable Connect's connector log contexts by default
[ https://issues.apache.org/jira/browse/KAFKA-12484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reassigned KAFKA-12484: - Assignee: Randall Hauch > Enable Connect's connector log contexts by default > -- > > Key: KAFKA-12484 > URL: https://issues.apache.org/jira/browse/KAFKA-12484 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > Connect's Log4J configuration does not by default log the connector contexts. > That feature was added in > [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] > and first appeared in AK 2.3.0, but it was not enabled by default since that > would not have been backward compatible. > But with AK 3.0.0, we have the opportunity to change the default in > {{config/connect-log4j.properties}} to enable connector log contexts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
kowshik commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r595493561 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteState.java ## @@ -83,4 +85,25 @@ public static RemotePartitionDeleteState forId(byte id) { return STATE_TYPES.get(id); } +public static boolean isValidTransition(RemotePartitionDeleteState srcState, Review comment: I have the same suggestions from `RemoteLogSegmentState` for this as well. Please refer to this comment: https://github.com/apache/kafka/pull/10218#discussion_r595492577 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10333: KAFKA-12481: Add socket.nagle.disable config property
ijuma commented on a change in pull request #10333: URL: https://github.com/apache/kafka/pull/10333#discussion_r595513484 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -391,6 +392,7 @@ object KafkaConfig { val AdvertisedListenersProp = "advertised.listeners" val ListenerSecurityProtocolMapProp = "listener.security.protocol.map" val ControlPlaneListenerNameProp = "control.plane.listener.name" + val SocketNagleDisableProp = "socket.nagle.disable" Review comment: I think I would call this `socket.tcp.no.delay`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
kowshik commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r595492577 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java ## @@ -87,4 +89,27 @@ public byte id() { public static RemoteLogSegmentState forId(byte id) { return STATE_TYPES.get(id); } + +public static boolean isValidTransition(RemoteLogSegmentState srcState, RemoteLogSegmentState targetState) { +Objects.requireNonNull(targetState, "targetState can not be null"); + +if (srcState == null) { +// If the source state is null, check the target state as the initial state viz DELETE_PARTITION_MARKED +// Wanted to keep this logic simple here by taking null for srcState, instead of creating one more state like +// COPY_SEGMENT_NOT_STARTED and have the null check by caller and pass that state. +return targetState == COPY_SEGMENT_STARTED; +} else if (srcState == targetState) { Review comment: 1. Will it be useful to place the implementation of this validation in a separate module, so that it can be reused with `RLMMWithTopicStorage` in the future? 2. Suggestion from the standpoint of code readability: Would it make sense to replace the `if-else` logic by looking up from a `Map< RemoteLogSegmentState, Set< RemoteLogSegmentState>>` where key is the source state and value is a set of allowed target states? ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class is an implementation of {@link RemoteLogMetadataManager} backed by inmemory store. + */ +public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManager { Review comment: We may want to think more about the locking semantics for this class and `RemoteLogMetadataCache`. Are we sure there would _not_ be use cases where we need to serialize mutations across the individually thread-safe attributes? If the answer is no, then using a fine-grained `Object` lock makes more sense because we can use it to guard critical sections. Should we evaluate this upfront? cc @junrao ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteState.java ## @@ -83,4 +85,25 @@ public static RemotePartitionDeleteState forId(byte id) { return STATE_TYPES.get(id); } +public static boolean isValidTransition(RemotePartitionDeleteState srcState, Review comment: I have the same suggestions from `RemoteLogSegmentState` for this as well. Please refer to this comment: ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the
[GitHub] [kafka] ijuma commented on pull request #10333: KAFKA-12481: Add socket.nagle.disable config property
ijuma commented on pull request #10333: URL: https://github.com/apache/kafka/pull/10333#issuecomment-800568236 Nice find. Since this introduces a new config, it would require a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
[ https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12485: --- Labels: newbie++ (was: ) > Speed up Consumer#committed by returning cached offsets for owned partitions > > > Key: KAFKA-12485 > URL: https://issues.apache.org/jira/browse/KAFKA-12485 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie++ > > All of the KafkaConsumer#committed APIs will currently make a remote blocking > call to the server to fetch the committed offsets. This is typically used to > reset the offsets after a crash or restart, or to fetch offsets for other > consumers in the group. However some users may wish to invoke this API on > partitions which are currently owned by the Consumer, in which case the > remote call is unnecessary since those offsets should already be known. > We should consider optimizing these APIs to just return the cached offsets in > place of the remote call when passed in only partitions that are currently > owned. This is similar to what we do in Consumer#position, although there we > have a guarantee that the partitions are owned by the Consumer whereas in > #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
A. Sophie Blee-Goldman created KAFKA-12485: -- Summary: Speed up Consumer#committed by returning cached offsets for owned partitions Key: KAFKA-12485 URL: https://issues.apache.org/jira/browse/KAFKA-12485 Project: Kafka Issue Type: Improvement Components: consumer Reporter: A. Sophie Blee-Goldman All of the KafkaConsumer#committed APIs will currently make a remote blocking call to the server to fetch the committed offsets. This is typically used to reset the offsets after a crash or restart, or to fetch offsets for other consumers in the group. However some users may wish to invoke this API on partitions which are currently owned by the Consumer, in which case the remote call is unnecessary since those offsets should already be known. We should consider optimizing these APIs to just return the cached offsets in place of the remote call when passed in only partitions that are currently owned. This is similar to what we do in Consumer#position, although there we have a guarantee that the partitions are owned by the Consumer whereas in #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12482) Remove deprecated Connect worker configs
[ https://issues.apache.org/jira/browse/KAFKA-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-12482: -- Description: The following Connect worker configuration properties were deprecated and should be removed in 3.0.0: * {{rest.host.name}} (deprecated in [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) * {{rest.port}} (deprecated in [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) * {{internal.key.converter}} (deprecated in [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) * {{internal.value.converter}} (deprecated in [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) was: The following Connect worker configuration properties were deprecated and should be removed in 3.0.0: * {{rest.host.name}} (deprecated in [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) * {{rest.port}} (deprecated in [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) * {{internal.key.converter}} (deprecated in [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) * {{internal.value.converter}} (deprecated in [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) * sd * > Remove deprecated Connect worker configs > > > Key: KAFKA-12482 > URL: https://issues.apache.org/jira/browse/KAFKA-12482 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Priority: Critical > Fix For: 3.0.0 > > > The following Connect worker configuration properties were deprecated and > should be removed in 3.0.0: > * {{rest.host.name}} (deprecated in > [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) > * {{rest.port}} (deprecated in > [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) > * {{internal.key.converter}} (deprecated in > [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) > * {{internal.value.converter}} (deprecated in > [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12484) Enable Connect's connector log contexts by default
Randall Hauch created KAFKA-12484: - Summary: Enable Connect's connector log contexts by default Key: KAFKA-12484 URL: https://issues.apache.org/jira/browse/KAFKA-12484 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Randall Hauch Fix For: 3.0.0 Connect's Log4J configuration does not by default log the connector contexts. That feature was added in [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] and first appeared in AK 2.3.0, but it was not enabled by default since that would not have been backward compatible. But with AK 3.0.0, we have the opportunity to change the default in {{config/connect-log4j.properties}} to enable connector log contexts. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12482) Remove deprecated Connect worker configs
Randall Hauch created KAFKA-12482: - Summary: Remove deprecated Connect worker configs Key: KAFKA-12482 URL: https://issues.apache.org/jira/browse/KAFKA-12482 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Randall Hauch Fix For: 3.0.0 The following Connect worker configuration properties were deprecated and should be removed in 3.0.0: * {{rest.host.name}} (deprecated in [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) * {{rest.port}} (deprecated in [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface]) * {{internal.key.converter}} (deprecated in [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) * {{internal.value.converter}} (deprecated in [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig]) * sd * -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12483) Enable client overrides in connector configs by default
Randall Hauch created KAFKA-12483: - Summary: Enable client overrides in connector configs by default Key: KAFKA-12483 URL: https://issues.apache.org/jira/browse/KAFKA-12483 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Randall Hauch Fix For: 3.0.0 Connector-specific client overrides were added in [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy], but that feature is not enabled by default since it would not have been backward compatible. But with AK 3.0.0, we have the opportunity to enable connector client overrides by default by changing the worker config's {{connector.client.config.override.policy}} default value to \{{All}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio commented on a change in pull request #10324: URL: https://github.com/apache/kafka/pull/10324#discussion_r595483066 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java ## @@ -44,33 +49,126 @@ public class TimelineHashMapBenchmark { private final static int NUM_ENTRIES = 1_000_000; +@State(Scope.Thread) +public static class HashMapInput { +public HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new HashMap<>(keys.size()); +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class ImmutableMapInput { +scala.collection.immutable.HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new scala.collection.immutable.HashMap<>(); +for (Integer key : keys) { +map = map.updated(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapSnapshotInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +int count = 0; +for (Integer key : keys) { +if (count % 1_000 == 0) { +snapshotRegistry.deleteSnapshotsUpTo(count - 10_000); +snapshotRegistry.createSnapshot(count); +} +map.put(key, String.valueOf(key)); +count++; +} + +Collections.shuffle(keys); +} +} + + @Benchmark public Map testAddEntriesInHashMap() { -HashMap map = new HashMap<>(NUM_ENTRIES); +HashMap map = new HashMap<>(); for (int i = 0; i < NUM_ENTRIES; i++) { int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); map.put(key, String.valueOf(key)); } + +return map; +} + +@Benchmark +public scala.collection.immutable.HashMap testAddEntriesInImmutableMap() { +scala.collection.immutable.HashMap map = new scala.collection.immutable.HashMap<>(); +for (int i = 0; i < NUM_ENTRIES; i++) { +int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); +map = map.updated(key, String.valueOf(key)); Review comment: Good catch. I looks like we were mostly measuring converting an int to a String! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full
[ https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-12330. Fix Version/s: 3.0.0 Resolution: Fixed > FetchSessionCache may cause starvation for partitions when FetchResponse is > full > > > Key: KAFKA-12330 > URL: https://issues.apache.org/jira/browse/KAFKA-12330 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Bradstreet >Assignee: David Jacot >Priority: Major > Fix For: 3.0.0 > > > The incremental FetchSessionCache sessions deprioritizes partitions where a > response is returned. This may happen if log metadata such as log start > offset, hwm, etc is returned, or if data for that partition is returned. > When a fetch response fills to maxBytes, data may not be returned for > partitions even if the fetch offset is lower than the fetch upper bound. > However, the fetch response will still contain updates to metadata such as > hwm if that metadata has changed. This can lead to degenerate behavior where > a partition's hwm or log start offset is updated resulting in the next fetch > being unnecessarily skipped for that partition. At first this appeared to be > worse, as hwm updates occur frequently, but starvation should result in hwm > movement becoming blocked, allowing a fetch to go through and then becoming > unstuck. However, it'll still require one more fetch request than necessary > to do so. Consumers may be affected more than replica fetchers, however they > often remove partitions with fetched data from the next fetch request and > this may be helping prevent starvation. > I believe we should only reorder the partition fetch priority if data is > actually returned for a partition. > {noformat} > private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER, > val updateFetchContextAndRemoveUnselected: > Boolean) > extends FetchSession.RESP_MAP_ITER { > var nextElement: util.Map.Entry[TopicPartition, > FetchResponse.PartitionData[Records]] = null > override def hasNext: Boolean = { > while ((nextElement == null) && iter.hasNext) { > val element = iter.next() > val topicPart = element.getKey > val respData = element.getValue > val cachedPart = session.partitionMap.find(new > CachedPartition(topicPart)) > val mustRespond = cachedPart.maybeUpdateResponseData(respData, > updateFetchContextAndRemoveUnselected) > if (mustRespond) { > nextElement = element > // Example POC change: > // Don't move partition to end of queue if we didn't actually fetch > data > // This should help avoid starvation even when we are filling the > fetch response fully while returning metadata for these partitions > if (updateFetchContextAndRemoveUnselected && respData.records != null > && respData.records.sizeInBytes > 0) { > session.partitionMap.remove(cachedPart) > session.partitionMap.mustAdd(cachedPart) > } > } else { > if (updateFetchContextAndRemoveUnselected) { > iter.remove() > } > } > } > nextElement != null > }{noformat} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram merged pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full
rajinisivaram merged pull request #10318: URL: https://github.com/apache/kafka/pull/10318 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full
rajinisivaram commented on pull request #10318: URL: https://github.com/apache/kafka/pull/10318#issuecomment-800540196 @dajac Thanks for the PR, LGTM. Merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map
ijuma commented on pull request #10324: URL: https://github.com/apache/kafka/pull/10324#issuecomment-800538930 Btw, you can find openjdk benchmarks here: https://github.com/openjdk/jdk/tree/master/test/micro/org/openjdk/bench/java/util This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12427) Broker does not close muted idle connections with buffered data
[ https://issues.apache.org/jira/browse/KAFKA-12427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-12427. Fix Version/s: 3.0.0 Reviewer: Rajini Sivaram Resolution: Fixed > Broker does not close muted idle connections with buffered data > --- > > Key: KAFKA-12427 > URL: https://issues.apache.org/jira/browse/KAFKA-12427 > Project: Kafka > Issue Type: Bug > Components: core, network >Reporter: David Mao >Assignee: David Mao >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram merged pull request #10267: KAFKA-12427: Don't update connection idle time for muted connections
rajinisivaram merged pull request #10267: URL: https://github.com/apache/kafka/pull/10267 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yatsukav opened a new pull request #10333: KAFKA-12481: Add socket.nagle.disable config property
yatsukav opened a new pull request #10333: URL: https://github.com/apache/kafka/pull/10333 A large number of topic-partitions on one broker causes burst of host's packets/sec metric. This pull request allow disable TCP_NODELAY socket option via Kafka Config. Big amount topic-partitions per broker raise enormous packets count. For example 30k topic-partitions under load per 4 broker spawn ~150k packets/sec. With disabled TCP_NODELAY this value reduced to ~3k packets/sec. More about how to reproduce problem and result after solving in JIRA: https://issues.apache.org/jira/browse/KAFKA-12481 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map
ijuma commented on a change in pull request #10324: URL: https://github.com/apache/kafka/pull/10324#discussion_r595472914 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java ## @@ -44,33 +49,126 @@ public class TimelineHashMapBenchmark { private final static int NUM_ENTRIES = 1_000_000; +@State(Scope.Thread) +public static class HashMapInput { +public HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new HashMap<>(keys.size()); +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class ImmutableMapInput { +scala.collection.immutable.HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new scala.collection.immutable.HashMap<>(); +for (Integer key : keys) { +map = map.updated(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapSnapshotInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +int count = 0; +for (Integer key : keys) { +if (count % 1_000 == 0) { +snapshotRegistry.deleteSnapshotsUpTo(count - 10_000); +snapshotRegistry.createSnapshot(count); +} +map.put(key, String.valueOf(key)); +count++; +} + +Collections.shuffle(keys); +} +} + + @Benchmark public Map testAddEntriesInHashMap() { -HashMap map = new HashMap<>(NUM_ENTRIES); +HashMap map = new HashMap<>(); for (int i = 0; i < NUM_ENTRIES; i++) { int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); map.put(key, String.valueOf(key)); } + +return map; +} + +@Benchmark +public scala.collection.immutable.HashMap testAddEntriesInImmutableMap() { +scala.collection.immutable.HashMap map = new scala.collection.immutable.HashMap<>(); +for (int i = 0; i < NUM_ENTRIES; i++) { +int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); +map = map.updated(key, String.valueOf(key)); +} + return map; } @Benchmark public Map testAddEntriesInTimelineMap() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); -TimelineHashMap map = -new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES); +TimelineHashMap map = new TimelineHashMap<>(snapshotRegistry, 16); for (int i = 0; i < NUM_ENTRIES; i++) { int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); Review comment: Hmm, I'd just generate the randoms during set-up and add them to an array. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests
ijuma edited a comment on pull request #10323: URL: https://github.com/apache/kafka/pull/10323#issuecomment-800535044 Thanks, this is very helpful. Since this is EPL 2, we need to check https://apache.org/legal/resolved.html#weak-copyleft-licenses This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests
ijuma commented on pull request #10323: URL: https://github.com/apache/kafka/pull/10323#issuecomment-800535044 Since this is EPL 2, we need to check https://apache.org/legal/resolved.html#weak-copyleft-licenses This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12477) Smart rebalancing with dynamic protocol selection
[ https://issues.apache.org/jira/browse/KAFKA-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302833#comment-17302833 ] A. Sophie Blee-Goldman commented on KAFKA-12477: Yes, I totally agree – the second rolling bounce, while not critical, still serves as a safety net. However I think we can still implement some kind of safety measures in its place, while still encouraging those users who can afford to perform the second rolling bounce to continue doing so. For example we could enforce that the selected protocol can only ever increase, and throw a fatal exception if the consumer has been using the COOPERATIVE protocol and suddenly they receive the RangeAssignor from a rebalance which only supports EAGER. > Smart rebalancing with dynamic protocol selection > - > > Key: KAFKA-12477 > URL: https://issues.apache.org/jira/browse/KAFKA-12477 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.0.0 > > > Users who want to upgrade their applications and enable the COOPERATIVE > rebalancing protocol in their consumer apps are required to follow a double > rolling bounce upgrade path. The reason for this is laid out in the [Consumer > Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer] > section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing > protocol in its constructor based on the list of supported partition > assignors. The protocol is selected as the highest protocol that is commonly > supported by all assignors in the list, and never changes after that. > This is a bit unfortunate because it may end up using an older protocol even > after every member in the group has been updated to support the newer > protocol. After the first rolling bounce of the upgrade, all members will > have two assignors: "cooperative-sticky" and "range" (or > sticky/round-robin/etc). At this point the EAGER protocol will still be > selected due to the presence of the "range" assignor, but it's the > "cooperative-sticky" assignor that will ultimately be selected for use in > rebalances if that assignor is preferred (ie positioned first in the list). > The only reason for the second rolling bounce is to strip off the "range" > assignor and allow the upgraded members to switch over to COOPERATIVE. We > can't allow them to use cooperative rebalancing until everyone has been > upgraded, but once they have it's safe to do so. > And there is already a way for the client to detect that everyone is on the > new byte code: if the CooperativeStickyAssignor is selected by the group > coordinator, then that means it is supported by all consumers in the group > and therefore everyone must be upgraded. > We may be able to save the second rolling bounce by dynamically updating the > rebalancing protocol inside the ConsumerCoordinator as "the highest protocol > supported by the assignor chosen by the group coordinator". This means we'll > still be using EAGER at the first rebalance, since we of course need to wait > for this initial rebalance to get the response from the group coordinator. > But we should take the hint from the chosen assignor rather than dropping > this information on the floor and sticking with the original protocol -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio commented on a change in pull request #10324: URL: https://github.com/apache/kafka/pull/10324#discussion_r595464237 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java ## @@ -44,33 +49,126 @@ public class TimelineHashMapBenchmark { private final static int NUM_ENTRIES = 1_000_000; +@State(Scope.Thread) +public static class HashMapInput { +public HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new HashMap<>(keys.size()); +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class ImmutableMapInput { +scala.collection.immutable.HashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +map = new scala.collection.immutable.HashMap<>(); +for (Integer key : keys) { +map = map.updated(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +Collections.shuffle(keys); +} +} + +@State(Scope.Thread) +public static class TimelineMapSnapshotInput { +public SnapshotRegistry snapshotRegistry; +public TimelineHashMap map; +public final List keys = createKeys(NUM_ENTRIES); + +@Setup(Level.Invocation) +public void setup() { +snapshotRegistry = new SnapshotRegistry(new LogContext()); +map = new TimelineHashMap<>(snapshotRegistry, keys.size()); + +for (Integer key : keys) { +map.put(key, String.valueOf(key)); +} + +int count = 0; +for (Integer key : keys) { +if (count % 1_000 == 0) { +snapshotRegistry.deleteSnapshotsUpTo(count - 10_000); +snapshotRegistry.createSnapshot(count); +} +map.put(key, String.valueOf(key)); +count++; +} + +Collections.shuffle(keys); +} +} + + @Benchmark public Map testAddEntriesInHashMap() { -HashMap map = new HashMap<>(NUM_ENTRIES); +HashMap map = new HashMap<>(); for (int i = 0; i < NUM_ENTRIES; i++) { int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); map.put(key, String.valueOf(key)); } + +return map; +} + +@Benchmark +public scala.collection.immutable.HashMap testAddEntriesInImmutableMap() { +scala.collection.immutable.HashMap map = new scala.collection.immutable.HashMap<>(); +for (int i = 0; i < NUM_ENTRIES; i++) { +int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); +map = map.updated(key, String.valueOf(key)); +} + return map; } @Benchmark public Map testAddEntriesInTimelineMap() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); -TimelineHashMap map = -new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES); +TimelineHashMap map = new TimelineHashMap<>(snapshotRegistry, 16); for (int i = 0; i < NUM_ENTRIES; i++) { int key = (int) (0x & ((i * 2862933555777941757L) + 3037000493L)); Review comment: I think this is an algorithm for generating pseudo random number. I think it relates to https://nuclear.llnl.gov/CNP/rng/rngman/node4.html. If this is true, let me fix the expression as it is supposed to multiply by `key` not `i`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500
rondagostino commented on a change in pull request #10227: URL: https://github.com/apache/kafka/pull/10227#discussion_r595456197 ## File path: KIP-500.md ## @@ -0,0 +1,131 @@ +KIP-500 Early Access Release + + +# Introduction +It is now possible to run Apache Kafka without Apache ZooKeeper! We call this mode [self-managed mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum). It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it is available for testing in the Kafka 2.8 release. + +When the Kafka cluster is in self-managed mode, it does not store its metadata in ZooKeeper. In fact, you do not have to run ZooKeeper at all, because it stores its metadata in a Raft quorum of controller nodes. + +Self-managed mode has many benefits -- some obvious, and some not so obvious. Clearly, it is nice to manage and configure one service rather than two services. In addition, you can now run a single process Kafka cluster. Most important of all, self-managed mode is more scalable. We expect to be able to [support many more topics and partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/) in this mode. + +# Quickstart + +## Warning +Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.0 is released, it may not even be possible to upgrade your self-managed clusters from 2.8 to 3.0 without downtime. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the early access release of KIP-500. Review comment: > it may not even be possible to upgrade your self-managed clusters from 2.8 to 3.0 without downtime. I think this statement as currently worded implies that we are committing to supporting an upgrade (potentially with some downtime). We should drop the qualifier "without downtime" at the end if we are not committing to that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10321: HOTFIX: timeout issue in Remove thread
ableegoldman commented on pull request #10321: URL: https://github.com/apache/kafka/pull/10321#issuecomment-800516409 Cherrypicked to 2.8 cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 opened a new pull request #10332: KAFKA-10697: Remove ProduceResponse.responses
tang7526 opened a new pull request #10332: URL: https://github.com/apache/kafka/pull/10332 issue : https://issues.apache.org/jira/browse/KAFKA-10697 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10697) Remove ProduceResponse.responses
[ https://issues.apache.org/jira/browse/KAFKA-10697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chun-Hao Tang reassigned KAFKA-10697: - Assignee: Chun-Hao Tang (was: UnityLung) > Remove ProduceResponse.responses > > > Key: KAFKA-10697 > URL: https://issues.apache.org/jira/browse/KAFKA-10697 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chun-Hao Tang >Priority: Minor > > This is a follow-up of KAFKA-9628. > related discussion: > https://github.com/apache/kafka/pull/9401#discussion_r518984349 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #10321: HOTFIX: timeout issue in Remove thread
vvcephei commented on pull request #10321: URL: https://github.com/apache/kafka/pull/10321#issuecomment-800501385 Thanks @wcarlson5. Yes, please cherry-pick it to 2.8. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12481) Add socket.nagle.disable config to reduce number of packets
[ https://issues.apache.org/jira/browse/KAFKA-12481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrei Iatsuk updated KAFKA-12481: -- Description: *What to do?* Add _socket.nagle.disable_ parameter to Apache Kafka config like in [librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md]. *What reason of this improvement?* A large number of topic-partitions on one broker causes burst of host's packets/sec metric. The traffic shaper in the cloud ceases to cope with such a load and causes service degradation. *How to reproduce?* # Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120. # Add 100 topics with 100 partitions each and replication factor = 3. It is 30k topic-partitions in total. Amount of packet/sec is ~15k. {code:java} import os for i in range(100): print(f"create topic 'flower{i}'... ", end="") cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions {} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", f"flower{i}", 100, 3) code = os.system(cmd) print("ok" if code == 0 else "error") {code} !Screenshot 2021-03-16 at 21.05.03.png! # Generate server load by launching next script in 4 terminals. Amount of packet/sec is ~130k. {code:java} import time from pykafka import KafkaClient client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092") while True: for i in range(100): print(f"sent message to 'flower{i}'") with client.topics[f"flower{i}"].get_sync_producer() as producer: for j in range(1000): producer.produce(str.encode(f'test message {j} in topic flower{i}' * 10)) {code} !Screenshot 2021-03-13 at 00.44.43.png! !Screenshot 2021-03-13 at 00.29.10.png! # Make dump of tcp connections via tcpdump due ~2 sec: {code:java} $ tcpdump -i eth1 -w dump.pcap tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144 bytes ^C8873886 packets captured 9139050 packets received by filter 265028 packets dropped by kernel {code} # Load dump to Wireshark and see statistics: ~99.999% of packets is inter broker messages, size of packets 40-160 bytes. On screen hosts with IPs 10.16.23.[157-160] is brokers: !Screenshot 2021-03-14 at 01.46.00.png! !Screenshot 2021-03-14 at 01.52.01.png! *How to fix?* # Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and provide value to kafka.network.Acceptor.accept(key) method in : [https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646] # For disabled TCP_NODELAY value: ## ~400 packets/s for idle broker (instead ~12k packets/s) ## ~3k packets/s for loaded broker (instead ~150k packets/s) !Screenshot 2021-03-16 at 21.12.17.png! was: *What to do?* Add _socket.nagle.disable_ parameter to Apache Kafka config like in [librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md]. *What reason of this improvement?* A large number of topic-partitions on one broker causes burst of host's packets/sec metric. The traffic shaper in the cloud ceases to cope with such a load and causes service degradation. *How to reproduce?* # Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120. # Add 100 topics with 100 partitions each and replication factor = 3. It is 30k topic-partitions in total. Amount of packet/sec is ~15k. {code:java} import os for i in range(100): print(f"create topic 'flower{i}'... ", end="") cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions {} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", f"flower{i}", 100, 3) code = os.system(cmd) print("ok" if code == 0 else "error") {code} !Screenshot 2021-03-16 at 21.05.03.png! # Generate server load by launching next script in 4 terminals. Amount of packet/sec is ~130k. {code:java} import time from pykafka import KafkaClient client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092") while True: for i in range(100): print(f"sent message to 'flower{i}'") with client.topics[f"flower{i}"].get_sync_producer() as producer: for j in range(1000): producer.produce(str.encode(f'test message {j} in topic flower{i}' * 10)) {code} !Screenshot 2021-03-13 at 00.44.43.png! !Screenshot 2021-03-13 at 00.29.10.png! # Make dump of tcp connections via tcpdump due ~2 sec: {code:java} $ tcpdump -i eth1 -w dump.pcap tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144 bytes ^C8873886 packets captured 9139050 packets received by filter 265028 packets dropped by kernel {code} # Load dump to Wireshark and see statistics: ~99.999% of packets is inter broker messages, size of packets 40-160 bytes. On screen hosts with IPs 10.16.23.[157-160] is brokers: !Screenshot 2021-03-14 at 01.46.00.png! !Screenshot 2021-03-14 at 01.52.01.png! *How to fix?* # Add boolean _socket.nagle.disable_ parameter to Apache Kafka
[jira] [Created] (KAFKA-12481) Add socket.nagle.disable config to reduce number of packets
Andrei Iatsuk created KAFKA-12481: - Summary: Add socket.nagle.disable config to reduce number of packets Key: KAFKA-12481 URL: https://issues.apache.org/jira/browse/KAFKA-12481 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.6.1, 2.7.0, 2.5.1, 2.4.1, 2.3.1, 2.2.2, 2.1.1, 2.0.1, 1.1.1, 1.0.2, 0.11.0.3, 0.10.2.2, 0.8.2.2 Reporter: Andrei Iatsuk Attachments: Screenshot 2021-03-13 at 00.29.10.png, Screenshot 2021-03-13 at 00.44.43.png, Screenshot 2021-03-14 at 01.46.00.png, Screenshot 2021-03-14 at 01.52.01.png, Screenshot 2021-03-16 at 21.05.03.png, Screenshot 2021-03-16 at 21.12.17.png *What to do?* Add _socket.nagle.disable_ parameter to Apache Kafka config like in [librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md]. *What reason of this improvement?* A large number of topic-partitions on one broker causes burst of host's packets/sec metric. The traffic shaper in the cloud ceases to cope with such a load and causes service degradation. *How to reproduce?* # Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120. # Add 100 topics with 100 partitions each and replication factor = 3. It is 30k topic-partitions in total. Amount of packet/sec is ~15k. {code:java} import os for i in range(100): print(f"create topic 'flower{i}'... ", end="") cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions {} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", f"flower{i}", 100, 3) code = os.system(cmd) print("ok" if code == 0 else "error") {code} !Screenshot 2021-03-16 at 21.05.03.png! # Generate server load by launching next script in 4 terminals. Amount of packet/sec is ~130k. {code:java} import time from pykafka import KafkaClient client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092") while True: for i in range(100): print(f"sent message to 'flower{i}'") with client.topics[f"flower{i}"].get_sync_producer() as producer: for j in range(1000): producer.produce(str.encode(f'test message {j} in topic flower{i}' * 10)) {code} !Screenshot 2021-03-13 at 00.44.43.png! !Screenshot 2021-03-13 at 00.29.10.png! # Make dump of tcp connections via tcpdump due ~2 sec: {code:java} $ tcpdump -i eth1 -w dump.pcap tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144 bytes ^C8873886 packets captured 9139050 packets received by filter 265028 packets dropped by kernel {code} # Load dump to Wireshark and see statistics: ~99.999% of packets is inter broker messages, size of packets 40-160 bytes. On screen hosts with IPs 10.16.23.[157-160] is brokers: !Screenshot 2021-03-14 at 01.46.00.png! !Screenshot 2021-03-14 at 01.52.01.png! *How to fix?* # Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and provide value to kafka.network.Acceptor.accept(key) method in : [https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646] # For disabled TCP_NODELAY value: ## ~400 packets/s for idle broker (instead ~12k packets/s) ## ~3k packets/s for loaded broker (instead ~150k packets/s) !Screenshot 2021-03-16 at 21.12.17.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests
stanislavkozlovski commented on pull request #10323: URL: https://github.com/apache/kafka/pull/10323#issuecomment-800488343 For what it's worth I evaluated jqwik vs Quickcheck and had the following bullet-point summaries: ### Quickcheck - 819 stars - 10-year old library - v1.0 released Nov 22, 2020 - 25 open issues - 55 commits in the last year - MIT License - light documentation - supports shrinking ### Jqwik - 260 stars - 5-year old library - v1.5 release Feb 2021 - 16 open issues - 1039 commits in the last year - EPL-2.0 License - extensive documentation - very configurable - supports shrinking My reasoning to prefer Jqwik was that it seemed more actively maintained, had good interfaces, had very extensive documentation (I value this heavily) and most importantly supports programmatic parameter generation, meaning it allows you to easily express the dependencies of randomized input. I got the notion that this random input dependency generation is one of the trickier things when writing more complex test cases from [this blog post](https://www.leadingagile.com/2018/04/step-by-step-toward-property-based-testing/). Jqwik has some other interesting features like [collecting and reporting statstics](https://jqwik.net/docs/current/user-guide.html#collecting-and-reporting-statistics) on the data it generates, allowing you to inspect what the generated data is and whether it's useful or can be tweaked. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full
chia7712 commented on a change in pull request #10318: URL: https://github.com/apache/kafka/pull/10318#discussion_r595414671 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -428,7 +428,7 @@ class IncrementalFetchContext(private val time: Time, val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) if (mustRespond) { nextElement = element - if (updateFetchContextAndRemoveUnselected) { + if (updateFetchContextAndRemoveUnselected && FetchResponse.recordsSize(respData) > 0) { Review comment: Thanks for nice explanation! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org