[GitHub] [kafka] chia7712 commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-16 Thread GitBox


chia7712 commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r595727918



##
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin

Review comment:
   the package name should be `kafka.admin` rather than `package 
unit.kafka.admin`

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,30 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+try {
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+val clusterBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
+case Some(brokerListStr) =>
+val inputBrokers = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+(inputBrokers, inputBrokers.diff(clusterBrokers))

Review comment:
   As the variable is called `existingBrokers `, we should find out the 
"true" existent brokers. In short, it should return 
`inputBrokers.intersect(clusterBrokers)` rather than `inputBrokers`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10339: MINOR: Remove redundant allows in import-control.xml

2021-03-16 Thread GitBox


dongjinleekr commented on pull request #10339:
URL: https://github.com/apache/kafka/pull/10339#issuecomment-800808391


   @dajac @rondagostino @ijuma Could you have a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr opened a new pull request #10339: MINOR: Remove redundant allows in import-control.xml

2021-03-16 Thread GitBox


dongjinleekr opened a new pull request #10339:
URL: https://github.com/apache/kafka/pull/10339


   I found this problem while working on [KIP-719: Add Log4J2 
Appender](https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Add+Log4J2+Appender).
   
   1. `org.apache.log4j` don't need to be allowed in shell, trogdor subpackage; 
they uses `slf4j`, not `log4`.
   2. `org.slf4j` don't need to be allowed in clients, server subpackage: 
`org.slf4j` is allowed globally.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10338: KAFKA-10251: wait for consumer rebalance completed before consuming records

2021-03-16 Thread GitBox


showuon opened a new pull request #10338:
URL: https://github.com/apache/kafka/pull/10338


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-16 Thread GitBox


wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-800805388


   Good afternoon @chia7712 @dajac Dear committers, if you have any comments on 
this PR, I will continue to improve it. If you are satisfied with it, can I 
apply for this PR to be merged into the trunk? :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12384) Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch

2021-03-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-12384:
-

Assignee: Luke Chen  (was: Kamal Chandraprakash)

> Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
> -
>
> Key: KAFKA-12384
> URL: https://issues.apache.org/jira/browse/KAFKA-12384
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> {quote}org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: 
> <(-1,-1)> at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at 
> kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


jsancio commented on pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#issuecomment-800747908


   @cmccabe the `testGetEntries` throw an exception when used against a 
snapshot:
   ```
   java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
   at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
   at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
   at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
   at java.base/java.util.Objects.checkIndex(Objects.java:372)
   at java.base/java.util.ArrayList.get(ArrayList.java:459)
   at 
org.apache.kafka.jmh.timeline.TimelineHashMapBenchmark.testGetEntries(TimelineHashMapBenchmark.java:221)
   at 
org.apache.kafka.jmh.timeline.jmh_generated.TimelineHashMapBenchmark_testGetEntries_jmhTest.testGetEntries_avgt_jmhStub(TimelineHashMapBenchmark_testGetEntries_jmhTest.java:246)
   at 
org.apache.kafka.jmh.timeline.jmh_generated.TimelineHashMapBenchmark_testGetEntries_jmhTest.testGetEntries_AverageTime(TimelineHashMapBenchmark_testGetEntries_jmhTest.java:183)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453)
   at 
org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437)
   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   at java.base/java.lang.Thread.run(Thread.java:834)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


jsancio commented on pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#issuecomment-800747379


   @ijuma Ready for review. I changed the benchmark structure to remove 
duplicate code.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


jsancio commented on a change in pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#discussion_r595670643



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -44,33 +49,126 @@
 public class TimelineHashMapBenchmark {
 private final static int NUM_ENTRIES = 1_000_000;
 
+@State(Scope.Thread)
+public static class HashMapInput {
+public HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new HashMap<>(keys.size());
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class ImmutableMapInput {
+scala.collection.immutable.HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new scala.collection.immutable.HashMap<>();
+for (Integer key : keys) {
+map = map.updated(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapSnapshotInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+int count = 0;
+for (Integer key : keys) {
+if (count % 1_000 == 0) {
+snapshotRegistry.deleteSnapshotsUpTo(count - 10_000);
+snapshotRegistry.createSnapshot(count);
+}
+map.put(key, String.valueOf(key));
+count++;
+}
+
+Collections.shuffle(keys);
+}
+}
+
+
 @Benchmark
 public Map testAddEntriesInHashMap() {
-HashMap map = new HashMap<>(NUM_ENTRIES);
+HashMap map = new HashMap<>();
 for (int i = 0; i < NUM_ENTRIES; i++) {
 int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
 map.put(key, String.valueOf(key));
 }
+
+return map;
+}
+
+@Benchmark
+public scala.collection.immutable.HashMap 
testAddEntriesInImmutableMap() {
+scala.collection.immutable.HashMap map = new 
scala.collection.immutable.HashMap<>();
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
+map = map.updated(key, String.valueOf(key));
+}
+
 return map;
 }
 
 @Benchmark
 public Map testAddEntriesInTimelineMap() {
 SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-TimelineHashMap map =
-new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+TimelineHashMap map = new 
TimelineHashMap<>(snapshotRegistry, 16);
 for (int i = 0; i < NUM_ENTRIES; i++) {
 int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sridhav commented on pull request #10337: KAFKA-12380: Executor in Connect's Worker is not shut down when the worker is

2021-03-16 Thread GitBox


sridhav commented on pull request #10337:
URL: https://github.com/apache/kafka/pull/10337#issuecomment-800746746


   @rhauch can you please review?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


jsancio commented on pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#issuecomment-800745760


   Test result after fixing the benchmark tests:
   ```
   Benchmark(mapType)   (size)  
Mode  CntScoreError  Units
   TimelineHashMapBenchmark.testAddEntries   HASH_MAP  100  
avgt   10  184.183 ± 12.318  ms/op
   TimelineHashMapBenchmark.testAddEntries SCALA_HASH_MAP  100  
avgt   10  350.935 ±  4.801  ms/op
   TimelineHashMapBenchmark.testAddEntries   TIMELINE_MAP  100  
avgt   10  340.839 ± 15.397  ms/op
   TimelineHashMapBenchmark.testAddEntries  TIMELINE_SNAPSHOT_MAP  100  
avgt   10  332.535 ± 36.350  ms/op
   TimelineHashMapBenchmark.testGetEntries   HASH_MAP  100  
avgt   10   37.772 ±  4.717  ms/op
   TimelineHashMapBenchmark.testGetEntries SCALA_HASH_MAP  100  
avgt   10  248.350 ±  4.445  ms/op
   TimelineHashMapBenchmark.testGetEntries   TIMELINE_MAP  100  
avgt   10   83.487 ±  6.952  ms/op
   TimelineHashMapBenchmark.testIterateEntries   HASH_MAP  100  
avgt   10   42.743 ±  1.184  ms/op
   TimelineHashMapBenchmark.testIterateEntries SCALA_HASH_MAP  100  
avgt   10   36.030 ±  0.937  ms/op
   TimelineHashMapBenchmark.testIterateEntries   TIMELINE_MAP  100  
avgt   10   54.760 ±  2.866  ms/op
   TimelineHashMapBenchmark.testRemoveEntriesHASH_MAP  100  
avgt   10   26.246 ±  1.141  ms/op
   TimelineHashMapBenchmark.testRemoveEntries  SCALA_HASH_MAP  100  
avgt   10  430.861 ± 13.864  ms/op
   TimelineHashMapBenchmark.testRemoveEntriesTIMELINE_MAP  100  
avgt   10   79.832 ± 12.833  ms/op
   TimelineHashMapBenchmark.testRemoveEntries   TIMELINE_SNAPSHOT_MAP  100  
avgt   10  185.170 ± 13.464  ms/op
   TimelineHashMapBenchmark.testUpdateEntriesHASH_MAP  100  
avgt   10   84.963 ± 10.411  ms/op
   TimelineHashMapBenchmark.testUpdateEntries  SCALA_HASH_MAP  100  
avgt   10  426.490 ±  6.468  ms/op
   TimelineHashMapBenchmark.testUpdateEntriesTIMELINE_MAP  100  
avgt   10  160.341 ± 13.799  ms/op
   TimelineHashMapBenchmark.testUpdateEntries   TIMELINE_SNAPSHOT_MAP  100  
avgt   10  300.875 ± 35.965  ms/op
   JMH benchmarks done
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-16 Thread GitBox


chia7712 commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-800745691


   > is there any work in progress for a KafkaApis.handleFetchRequest test? I 
suspect it would be similar but maybe a bit harder than what I did for the 
LeaderAndIsr version #10071 (trading replicamanager for fetchmanager, etc). 
This benchmark would be helpful for #9944 as you could probably guess :)
   
   this PR is blocked by #9944. This PR (and other related issues) aim to 
remove all extra collection creation by using auto-generated data. In #9944 we 
have to create a lot of collections to handle the topic id in fetch request. 
Hence, I need to rethink the value (and approach) of this PR :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sridhav opened a new pull request #10337: KAFKA-12380: Executor in Connect's Worker is not shut down when the worker is

2021-03-16 Thread GitBox


sridhav opened a new pull request #10337:
URL: https://github.com/apache/kafka/pull/10337


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   When the worker is stopped, it does not shutdown this executor. 
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   The following tests are run:
   * `./gradlew connect:test`
   * `./gradlew connect:unitTest`
   * `./gradlew connect:integrationTest`
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10292: MINOR: fix client_compatibility_features_test.py

2021-03-16 Thread GitBox


chia7712 commented on pull request #10292:
URL: https://github.com/apache/kafka/pull/10292#issuecomment-800743083


   @cmccabe @rondagostino thanks for all your comments (and nice explanation)! 
I have update code according to reviews. please take a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10292: MINOR: fix client_compatibility_features_test.py

2021-03-16 Thread GitBox


chia7712 commented on a change in pull request #10292:
URL: https://github.com/apache/kafka/pull/10292#discussion_r595666144



##
File path: tests/kafkatest/tests/client/client_compatibility_features_test.py
##
@@ -81,8 +81,12 @@ def __init__(self, test_context):
 self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, 
topics=self.topics)
 
 def invoke_compatibility_program(self, features):
-# Run the compatibility test on the first Kafka node.
-node = self.kafka.nodes[0]
+if self.zk:
+# kafka nodes are set to older version so resolved script path is 
linked to older assembly.
+# run the compatibility test on the first zk node to get script 
path linked to latest(dev) assembly.
+node = self.zk.nodes[0]
+else:
+node = self.kafka.nodes[0]

Review comment:
   copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

2021-03-16 Thread GitBox


tang7526 commented on pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#issuecomment-800730712


   @chia7712 Could you help review this PR?  Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981
 ] 

Chris Egerton edited comment on KAFKA-12463 at 3/17/21, 1:34 AM:
-

Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered KAFKA-12487 , I don't even think 
we're at the point of updating the description yet to mention an upgrade 
process that would accommodate the {{CooperativeStickyAssignor}} without having 
a warning notice preceding any hypothetical designs we might implement once 
both of these pain points are addressed. I've updated the description 
accordingly, feel free to make any edits as long as we don't actually instruct 
users how to configure their workers with the {{CooperativeStickyAssignor}} as 
that will lead to bad worker behavior.
{quote}In both standalone and distributed, Connect only updates the task 
configs if and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
{quote}
Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed 
but it turns out that the overwhelming majority of connectors out there are 
unaffected just by quirk of how people tend to implement 
{{Connector::taskConfigs}}. The only cases I've been able to find where this 
bug comes up are in the file stream connectors. If we believe this is likely to 
affect other connectors, I personally think we should be addressing that bug 
instead of working around it or documenting it as a potential gotchas.
{quote}If Connect does attempt to restart the connector's tasks, the herder 
does not wait for the tasks to stop before starting any of them. This means 
that it may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.
{quote}
That's a fair point, and it applies to any change of partition assignment 
strategy and not just specifically moving from an eager to a cooperative one. 
This becomes especially likely if a task isn't able to respond to a shutdown 
request within the graceful shutdown period (which defaults to five seconds). 
The workaround here is to enable both partition assignment strategies for the 
consumer with a preference for the desired strategy; that way, the desired 
strategy will take effect as soon as every consumer in the group has been 
updated, and nobody will break beforehand. I'll update the workaround section 
in the description to include that info.

 

I'd also just like to point out that the goal here is to improve the 
out-of-the-box behavior of Connect for users; although workarounds are nice to 
have, the goal here shouldn't be to focus on documenting them but instead, to 
make them obsolete. If we decide not to improve the default behavior of Connect 
then we can document this somewhere else that's a little more visible for users 
as opposed to developers. And there aren't any technical limitations preventing 
us from choosing a non-cooperative assignor right now and running with it, so 
if worst comes to worst, we might consider switching to, e.g., the 
{{RoundRobinAssignor}} and calling that good enough.


was (Author: chrisegerton):
Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's 

[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981
 ] 

Chris Egerton edited comment on KAFKA-12463 at 3/17/21, 1:32 AM:
-

Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered KAFKA-12487 , I don't even think 
we're at the point of updating the description yet to mention an upgrade 
process that would accommodate the {{CooperativeStickyAssignor}} without having 
a warning notice preceding any hypothetical designs we might implement once 
both of these pain points are addressed. I've updated the description 
accordingly, feel free to make any edits as long as we don't actually instruct 
users how to configure their workers with the {{CooperativeStickyAssignor}} as 
that will lead to bad worker behavior.
{quote}In both standalone and distributed, Connect only updates the task 
configs if and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
{quote}
Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed 
but it turns out that the overwhelming majority of connectors out there are 
unaffected just by quirk of how people tend to implement 
{{Connector::taskConfigs}}. The only cases I've been able to find where this 
bug comes up are in the file stream connectors. If we believe this is likely to 
affect other connectors, I personally think we should be addressing that bug 
instead of working around it or documenting it as a potential gotchas.
{quote}If Connect does attempt to restart the connector's tasks, the herder 
does not wait for the tasks to stop before starting any of them. This means 
that it may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.
{quote}
That's a fair point, and it applies to any change of partition assignment 
strategy and not just specifically moving from an eager to a cooperative one. 
This becomes especially likely if a task isn't able to respond to a shutdown 
request within the graceful shutdown period (which defaults to five seconds). 
The workaround here is to enable both partition assignment strategies for the 
consumer with a preference for the desired strategy; that way, the desired 
strategy will take effect as soon as every consumer in the group has been 
updated, and nobody will break beforehand. I'll update the workaround section 
in the description to include that info.

 

I'd also just like to point out that the goal here is to improve the 
out-of-the-box behavior of Connect for users; although workarounds are nice to 
have, the goal here shouldn't be to focus on documenting them but instead, to 
make them obsolete. If we decide not to improve the default behavior of Connect 
then we can document this somewhere else that's a little more visible for users 
as opposed to developers.


was (Author: chrisegerton):
Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered 

[GitHub] [kafka] jsancio commented on a change in pull request #10334: MINOR: Fix BaseHashTable sizing

2021-03-16 Thread GitBox


jsancio commented on a change in pull request #10334:
URL: https://github.com/apache/kafka/pull/10334#discussion_r595640017



##
File path: metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java
##
@@ -56,12 +58,30 @@
 this.elements = new Object[expectedSizeToCapacity(expectedSize)];
 }
 
+/**
+ * Calculate the capacity we should provision, given the expected size.
+ *
+ * Our capacity must always be a power of 2, and never less than 2.
+ */
 static int expectedSizeToCapacity(int expectedSize) {
-if (expectedSize <= 1) {
-return 2;
+if (expectedSize >= MAX_CAPACITY / 2) {
+return MAX_CAPACITY;
+}
+return Math.max(MIN_CAPACITY, roundUpToPowerOfTwo(expectedSize * 2));
+}
+
+private static int roundUpToPowerOfTwo(int i) {
+if (i < 0) {
+return 0;
 }
-double sizeToFit = expectedSize / MAX_LOAD_FACTOR;
-return (int) Math.min(MAX_CAPACITY, Math.ceil(Math.log(sizeToFit) / 
LN_2));
+i = i - 1;
+i |= i >> 1;
+i |= i >> 2;
+i |= i >> 4;
+i |= i >> 8;
+i |= i >> 16;
+i = i + 1;
+return i < 0 ? MAX_CAPACITY : i;

Review comment:
   Or 
https://github.com/google/guava/blob/master/guava/src/com/google/common/math/IntMath.java#L56-L72
   
   It is safe to look as it is Apache License 2.0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10292: MINOR: fix client_compatibility_features_test.py

2021-03-16 Thread GitBox


rondagostino commented on a change in pull request #10292:
URL: https://github.com/apache/kafka/pull/10292#discussion_r595639780



##
File path: tests/kafkatest/tests/client/client_compatibility_features_test.py
##
@@ -81,8 +81,12 @@ def __init__(self, test_context):
 self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, 
topics=self.topics)
 
 def invoke_compatibility_program(self, features):
-# Run the compatibility test on the first Kafka node.
-node = self.kafka.nodes[0]
+if self.zk:
+# kafka nodes are set to older version so resolved script path is 
linked to older assembly.
+# run the compatibility test on the first zk node to get script 
path linked to latest(dev) assembly.
+node = self.zk.nodes[0]
+else:
+node = self.kafka.nodes[0]

Review comment:
   We can use the dev version of the tool on the Kafka node via code like 
this:
   ```
   node = self.kafka.nodes[0]
   cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest "
  "--bootstrap-server %s "
  "--num-cluster-nodes %d "
  "--topic %s " % (self.dev_script_path,
  self.kafka.bootstrap_servers(),
  len(self.kafka.nodes),
  list(self.topics.keys())[0]))
   ```
   
   And then further down we can define the DEV script path like this:
   
   ```
   # Always use the latest version of 
org.apache.kafka.tools.ClientCompatibilityTest
   # so store away the path to the DEV version before we set the Kafka 
version
   self.dev_script_path = self.kafka.path.script("kafka-run-class.sh", 
self.kafka.nodes[0])
   self.kafka.set_version(KafkaVersion(broker_version))
   ```
   
   I tested this locally and it solves the problem.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595633992



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;

Review comment:
   Oh, and on the question of why inflight requests has a list, it was done 
for future-proofing





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595633668



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;

Review comment:
   Notice that we set `maxInFlightRequestsPerConnection` to 1 when 
constructing the `NetworkClient`.  We don't support sending multiple requests 
to a single node on a single connection in `AdminClient`.  I think we could add 
this support, but we'd have to check how the server handled it since we've 
never done it before.  Maybe there should be a JIRA.
   
   Also, if we do choose to add this support for multiple outstanding requests 
per node per socket we'd need some way to distinguish between "waiting for a 
chance to use this connection" from "waiting for this connection to be opened" 
in NetworkClient.  Currently ready just returns a boolean, which isn't enough 
information to distinguish these two cases.  We could probably add a new method 
that returned an enum or something.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595632280



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;
+}
 if (!client.ready(node, now)) {
+Long deadline = nodeReadyDeadlines.get(node);
+if (deadline != null) {
+if (now >= deadline) {
+log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+"because the node is taking too long to become 
ready.",
+node.idString(), calls.size());
+transitionToPendingAndClearList(calls);
+client.disconnect(node.idString());
+nodeReadyDeadlines.remove(node);
+iter.remove();
+continue;
+}
+pollTimeout = Math.min(pollTimeout, deadline - now);
+} else {
+nodeReadyDeadlines.put(node, now + requestTimeoutMs);
+}
 long nodeTimeout = client.pollDelayMs(node, now);
 pollTimeout = Math.min(pollTimeout, nodeTimeout);
 log.trace("Client is not ready to send to {}. Must delay 
{} ms", node, nodeTimeout);
 continue;
 }
-Call call = calls.remove(0);
-int requestTimeoutMs = 
Math.min(KafkaAdminClient.this.requestTimeoutMs,
+int remainingRequestTime;
+Long deadlineMs = nodeReadyDeadlines.remove(node);

Review comment:
   I added a comment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595631870



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;
+}
 if (!client.ready(node, now)) {
+Long deadline = nodeReadyDeadlines.get(node);
+if (deadline != null) {
+if (now >= deadline) {
+log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+"because the node is taking too long to become 
ready.",
+node.idString(), calls.size());
+transitionToPendingAndClearList(calls);
+client.disconnect(node.idString());
+nodeReadyDeadlines.remove(node);
+iter.remove();
+continue;
+}
+pollTimeout = Math.min(pollTimeout, deadline - now);
+} else {
+nodeReadyDeadlines.put(node, now + requestTimeoutMs);

Review comment:
   The complexity of the min / max issue is one thing.  Another thing is 
that we don't know when the connection has been established, and when it has 
not.  NetworkClient doesn't expose this information.  `NetworkClient#ready` may 
return false for a variety of reasons, many of which are not indicative of 
connection establishment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595631316



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1136,17 +1185,12 @@ private void timeoutCallsInFlight(TimeoutProcessor 
processor) {
 // only one we need to check the timeout for.
 Call call = contexts.get(0);
 if (processor.callHasExpired(call)) {
-if (call.aborted) {
-log.warn("Aborted call {} is still in callsInFlight.", 
call);
-} else {
-log.debug("Closing connection to {} due to timeout 
while awaiting {}", nodeId, call);
-call.aborted = true;
-client.disconnect(nodeId);
-numTimedOut++;
-// We don't remove anything from the callsInFlight 
data structure. Because the connection
-// has been closed, the calls should be returned by 
the next client#poll(),
-// and handled at that point.
-}
+log.debug("Disconnecting from {} due to timeout while 
awaiting {}", nodeId, call);

Review comment:
   OK, let's raise it to `INFO`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in AK releases 2.4 
and later that don't also include this improvement: either set a value for the 
{{consumer.partition.assignment.strategy}} property in the *worker 
configuration, or* set a value for the 
{{consumer.override.partition.assignment.strategy}} property in one or more 
*connector configurations* when per-connector client overrides is enabled in 
the worker config with {{connector.client.config.override.policy=ALL}}.

In order to avoid task failures while the connector is being reconfigured, it 
is highly recommended that the consumer be configured with a list of both the 
new and the current partition assignment strategies, instead of just the new 
partition assignment strategy. For example, to update a 

[jira] [Comment Edited] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-12485 at 3/17/21, 12:04 AM:
---

Side note: if we get this optimization in then we should be able to skip the 
last(\?) followup for KIP-572: handling TimeoutExceptions thrown from the 
mainConsumer#committed call when reinitializing offsets for corrupted tasks in 
TaskManager#closeAndRevive. cc [~mjsax]


was (Author: ableegoldman):
Side note: if we get this optimization in then we should be able to skip the 
last (\?) followup for KIP-572: handling TimeoutExceptions thrown from the 
mainConsumer#committed call when reinitializing offsets for corrupted tasks in 
TaskManager#closeAndRevive. cc [~mjsax]

> Speed up Consumer#committed by returning cached offsets for owned partitions
> 
>
> Key: KAFKA-12485
> URL: https://issues.apache.org/jira/browse/KAFKA-12485
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie++
>
> All of the KafkaConsumer#committed APIs will currently make a remote blocking 
> call to the server to fetch the committed offsets. This is typically used to 
> reset the offsets after a crash or restart, or to fetch offsets for other 
> consumers in the group. However some users may wish to invoke this API on 
> partitions which are currently owned by the Consumer, in which case the 
> remote call is unnecessary since it should be able to just keep track of what 
> it has itself committed.
> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned. This is similar to what we do in Consumer#position, although there we 
> have a guarantee that the partitions are owned by the Consumer whereas in 
> #committed we do not



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-12485 at 3/17/21, 12:03 AM:
---

Side note: if we get this optimization in then we should be able to skip the 
last (\?) followup for KIP-572: handling TimeoutExceptions thrown from the 
mainConsumer#committed call when reinitializing offsets for corrupted tasks in 
TaskManager#closeAndRevive. cc [~mjsax]


was (Author: ableegoldman):
Side note: if we get this optimization in then we should be able to skip the 
last (?) followup for KIP-572: handling TimeoutExceptions thrown from the 
mainConsumer#committed call when reinitializing offsets for corrupted tasks in 
TaskManager#closeAndRevive. cc [~mjsax]

> Speed up Consumer#committed by returning cached offsets for owned partitions
> 
>
> Key: KAFKA-12485
> URL: https://issues.apache.org/jira/browse/KAFKA-12485
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie++
>
> All of the KafkaConsumer#committed APIs will currently make a remote blocking 
> call to the server to fetch the committed offsets. This is typically used to 
> reset the offsets after a crash or restart, or to fetch offsets for other 
> consumers in the group. However some users may wish to invoke this API on 
> partitions which are currently owned by the Consumer, in which case the 
> remote call is unnecessary since it should be able to just keep track of what 
> it has itself committed.
> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned. This is similar to what we do in Consumer#position, although there we 
> have a guarantee that the partitions are owned by the Consumer whereas in 
> #committed we do not



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981
 ] 

Chris Egerton edited comment on KAFKA-12463 at 3/17/21, 12:03 AM:
--

Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered KAFKA-12487 , I don't even think 
we're at the point of updating the description yet to mention an upgrade 
process that would accommodate the {{CooperativeStickyAssignor}} without having 
a warning notice preceding any hypothetical designs we might implement once 
both of these pain points are addressed. I've updated the description 
accordingly, feel free to make any edits as long as we don't actually instruct 
users how to configure their workers with the {{CooperativeStickyAssignor}} as 
that will lead to bad worker behavior.
{quote}In both standalone and distributed, Connect only updates the task 
configs if and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
{quote}
Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed 
but it turns out that the overwhelming majority of connectors out there are 
unaffected by just by quirk of how people tend to implement 
{{Connector::taskConfigs}}. The only cases I've been able to find where this 
bug comes up are in the file stream connectors. If we believe this is likely to 
affect other connectors, I personally think we should be addressing that bug 
instead of working around it or documenting it as a potential gotchas.
{quote}If Connect does attempt to restart the connector's tasks, the herder 
does not wait for the tasks to stop before starting any of them. This means 
that it may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.
{quote}
That's a fair point, and it applies to any change of partition assignment 
strategy and not just specifically moving from an eager to a cooperative one. 
This becomes especially likely if a task isn't able to respond to a shutdown 
request within the graceful shutdown period (which defaults to five seconds). 
The workaround here is to enable both partition assignment strategies for the 
consumer with a preference for the desired strategy; that way, the desired 
strategy will take effect as soon as every consumer in the group has been 
updated, and nobody will break beforehand. I'll update the workaround section 
in the description to include that info.

 

I'd also just like to point out that the goal here is to improve the 
out-of-the-box behavior of Connect for users; although workarounds are nice to 
have, the goal here shouldn't be to focus on documenting them but instead, to 
make them obsolete. If we decide not to improve the default behavior of Connect 
then we can document this somewhere else that's a little more visible for users 
as opposed to developers.


was (Author: chrisegerton):
Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered 

[jira] [Comment Edited] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-12485 at 3/17/21, 12:03 AM:
---

Side note: if we get this optimization in then we should be able to skip the 
last (?) followup for KIP-572: handling TimeoutExceptions thrown from the 
mainConsumer#committed call when reinitializing offsets for corrupted tasks in 
TaskManager#closeAndRevive. cc [~mjsax]


was (Author: ableegoldman):
Side note: if we get this optimization in then we should be able to skip the 
last(?) followup for KIP-572: handling TimeoutExceptions thrown from the 
mainConsumer#committed call when reinitializing offsets for corrupted tasks in 
TaskManager#closeAndRevive. cc [~mjsax]

> Speed up Consumer#committed by returning cached offsets for owned partitions
> 
>
> Key: KAFKA-12485
> URL: https://issues.apache.org/jira/browse/KAFKA-12485
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie++
>
> All of the KafkaConsumer#committed APIs will currently make a remote blocking 
> call to the server to fetch the committed offsets. This is typically used to 
> reset the offsets after a crash or restart, or to fetch offsets for other 
> consumers in the group. However some users may wish to invoke this API on 
> partitions which are currently owned by the Consumer, in which case the 
> remote call is unnecessary since it should be able to just keep track of what 
> it has itself committed.
> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned. This is similar to what we do in Consumer#position, although there we 
> have a guarantee that the partitions are owned by the Consumer whereas in 
> #committed we do not



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302981#comment-17302981
 ] 

Chris Egerton commented on KAFKA-12463:
---

Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered KAFKA-12487 , I don't even think 
we're at the point of updating the description yet to mention an upgrade 
process that would accommodate the {{CooperativeStickyAssignor}} without having 
a warning notice preceding any hypothetical designs we might implement once 
both of these pain points are addressed. I've updated the description 
accordingly, feel free to make any edits as long as we don't actually instruct 
users how to configure their workers with the {{CooperativeStickyAssignor}} as 
that will lead to bad worker behavior.
{quote}In both standalone and distributed, Connect only updates the task 
configs if and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
{quote}
Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed 
but it turns out that the overwhelming majority of connectors out there are 
unaffected by just by quirk of how people tend to implement 
{{Connector::taskConfigs}}. The only cases I've been able to find where this 
bug comes up are in the file stream connectors. If we believe this is likely to 
affect other connectors, I personally think we should be addressing that bug 
instead of working around it or documenting it as a potential gotchas.
{quote}If Connect does attempt to restart the connector's tasks, the herder 
does not wait for the tasks to stop before starting any of them. This means 
that it may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.
{quote}
That's a fair point, and it applies to any change of partition assignment 
strategy and not just specifically moving from an eager to a cooperative one. 
This becomes especially likely if a task isn't able to respond to a shutdown 
request within the graceful shutdown period (which defaults to five seconds). 
The workaround here is to enable both partition assignment strategies for the 
consumer with a preference for the desired strategy; that way, the desired 
strategy will take effect as soon as every consumer in the group has been 
updated, and nobody will break beforehand. I'll update the workaround section 
in the description to include that info.

 

I'd like to point out that the goal here is to improve the out-of-the-box 
behavior of Connect for users; although workarounds are nice to have, the goal 
here shouldn't be to focus on documenting them but instead, to make them 
obsolete. If we decide not to improve the default behavior of Connect then we 
can document this somewhere else that's a little more visible for users as 
opposed to developers.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> 

[jira] [Commented] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302980#comment-17302980
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12485:


Side note: if we get this optimization in then we should be able to skip the 
last(?) followup for KIP-572: handling TimeoutExceptions thrown from the 
mainConsumer#committed call when reinitializing offsets for corrupted tasks in 
TaskManager#closeAndRevive. cc [~mjsax]

> Speed up Consumer#committed by returning cached offsets for owned partitions
> 
>
> Key: KAFKA-12485
> URL: https://issues.apache.org/jira/browse/KAFKA-12485
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie++
>
> All of the KafkaConsumer#committed APIs will currently make a remote blocking 
> call to the server to fetch the committed offsets. This is typically used to 
> reset the offsets after a crash or restart, or to fetch offsets for other 
> consumers in the group. However some users may wish to invoke this API on 
> partitions which are currently owned by the Consumer, in which case the 
> remote call is unnecessary since it should be able to just keep track of what 
> it has itself committed.
> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned. This is similar to what we do in Consumer#position, although there we 
> have a guarantee that the partitions are owned by the Consumer whereas in 
> #committed we do not



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12485:
---
Description: 
All of the KafkaConsumer#committed APIs will currently make a remote blocking 
call to the server to fetch the committed offsets. This is typically used to 
reset the offsets after a crash or restart, or to fetch offsets for other 
consumers in the group. However some users may wish to invoke this API on 
partitions which are currently owned by the Consumer, in which case the remote 
call is unnecessary since it should be able to just keep track of what it has 
itself committed.

We should consider optimizing these APIs to just return the cached offsets in 
place of the remote call when passed in only partitions that are currently 
owned. This is similar to what we do in Consumer#position, although there we 
have a guarantee that the partitions are owned by the Consumer whereas in 
#committed we do not

  was:
All of the KafkaConsumer#committed APIs will currently make a remote blocking 
call to the server to fetch the committed offsets. This is typically used to 
reset the offsets after a crash or restart, or to fetch offsets for other 
consumers in the group. However some users may wish to invoke this API on 
partitions which are currently owned by the Consumer, in which case the remote 
call is unnecessary since those offsets should already be known.

We should consider optimizing these APIs to just return the cached offsets in 
place of the remote call when passed in only partitions that are currently 
owned. This is similar to what we do in Consumer#position, although there we 
have a guarantee that the partitions are owned by the Consumer whereas in 
#committed we do not


> Speed up Consumer#committed by returning cached offsets for owned partitions
> 
>
> Key: KAFKA-12485
> URL: https://issues.apache.org/jira/browse/KAFKA-12485
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie++
>
> All of the KafkaConsumer#committed APIs will currently make a remote blocking 
> call to the server to fetch the committed offsets. This is typically used to 
> reset the offsets after a crash or restart, or to fetch offsets for other 
> consumers in the group. However some users may wish to invoke this API on 
> partitions which are currently owned by the Consumer, in which case the 
> remote call is unnecessary since it should be able to just keep track of what 
> it has itself committed.
> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned. This is similar to what we do in Consumer#position, although there we 
> have a guarantee that the partitions are owned by the Consumer whereas in 
> #committed we do not



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in AK releases 2.4 
and later that don't also include this improvement: either set a value for the 
{{consumer.partition.assignment.strategy}} property in the *worker 
configuration, or* set a value for the 
{{consumer.override.partition.assignment.strategy}} property in one or more 
*connector configurations* when per-connector client overrides is enabled in 
the worker config with {{connector.client.config.override.policy=ALL}}.

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 

[GitHub] [kafka] rondagostino commented on pull request #10292: MINOR: fix client_compatibility_features_test.py

2021-03-16 Thread GitBox


rondagostino commented on pull request #10292:
URL: https://github.com/apache/kafka/pull/10292#issuecomment-800685338


   > Can we just use a separate node for the client, unconditionally
   
   We don't have a `Service` for it, and we currently invoke it in a node that 
we allocate for the test.  With no ZooKeeper all we have is Kafka (version >= 
2.8, of course).
   
   I could see a situation where we add functionality to 
`org.apache.kafka.tools.ClientCompatibilityTest` in some future version and 
then we try to run that against Kafka v2.8 -- and then it'll go BOOM! as you 
suggest.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless*

 

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in AK releases 2.4 
and later that don't also include this fix: either set the following in the 
*worker configuration*:
{code:java}
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
or set the following in *connector configurations* when per-connector client 
overrides is enabled in the worker config with 
{{connector.client.config.override.policy=ALL}}:
{code:java}
consumer.override.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 

[GitHub] [kafka] rondagostino commented on pull request #10292: MINOR: fix client_compatibility_features_test.py

2021-03-16 Thread GitBox


rondagostino commented on pull request #10292:
URL: https://github.com/apache/kafka/pull/10292#issuecomment-800679385


   > use the devel version of the tool when ZK is enabled, and the old version 
when it's not
   
   The code as amended in this PR will use the devel version when ZooKeeper is 
enabled (which is the way it always was except after I changed/broke it) and 
then, when Zookeeper is not enabled -- which is a new case -- it will use the 
Kafka version, which by definition will be >= 2.8.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12473) Make the CooperativeStickyAssignor the default assignor

2021-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302970#comment-17302970
 ] 

Chris Egerton commented on KAFKA-12473:
---

This doesn't necessarily block the change proposed here, but we should take 
care to make sure that https://issues.apache.org/jira/browse/KAFKA-12487 
doesn't cause sink connectors to suddenly begin failing on Connect workers 
after an upgrade to 3.0.

 

It'd be fantastic if we could patch it in time for 3.0, but if not, we'll have 
to hard-code Connect to use a different, non-cooperative partition assignor by 
default.

> Make the CooperativeStickyAssignor the default assignor
> ---
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> Note that this will require users to follow the [upgrade path laid out in 
> KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429:+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]
>  to safely perform a rolling upgrade. When we change the default assignor we 
> need to make sure this is clearly documented in the upgrade guide for 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino edited a comment on pull request #10297: MINOR: fix failing ZooKeeper system tests

2021-03-16 Thread GitBox


rondagostino edited a comment on pull request #10297:
URL: https://github.com/apache/kafka/pull/10297#issuecomment-800676314


   @cmccabe All set.  I added the comment and also opened 
https://issues.apache.org/jira/browse/KAFKA-12488 for the refactor.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10297: MINOR: fix failing ZooKeeper system tests

2021-03-16 Thread GitBox


rondagostino commented on pull request #10297:
URL: https://github.com/apache/kafka/pull/10297#issuecomment-800676314


   @cmccabe Al set.  I added the comment and also opened 
https://issues.apache.org/jira/browse/KAFKA-12488 for the refactor.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12488) Be more specific about enabled SASL mechnanisms in system tests

2021-03-16 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-12488:
-

 Summary: Be more specific about enabled SASL mechnanisms in system 
tests
 Key: KAFKA-12488
 URL: https://issues.apache.org/jira/browse/KAFKA-12488
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Ron Dagostino


The `SecurityConfig.enabled_sasl_mechanisms()` method simply returns all SASL 
mechanisms that are enabled for the test -- whether for brokers, clients, 
controllers, or Zookeeper.  These enabled mechanisms are used in JAAS config 
files to determine what appears in those config files.  For example, the entire 
list of enabled mechanisms is used in both KafkaClient{} and KafkaServer{} 
sections, but that's way too broad.  We should be more precise about what 
mechanisms we are interested in for the different sections of these JAAS config 
files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12487) Sink connectors do not work with the cooperative consumer rebalance protocol

2021-03-16 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-12487:
-

 Summary: Sink connectors do not work with the cooperative consumer 
rebalance protocol
 Key: KAFKA-12487
 URL: https://issues.apache.org/jira/browse/KAFKA-12487
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.0.0, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
Reporter: Chris Egerton
Assignee: Chris Egerton


The {{ConsumerRebalanceListener}} used by the framework to respond to rebalance 
events in consumer groups for sink tasks is hard-coded with the assumption that 
the consumer performs rebalances eagerly. In other words, it assumes that 
whenever {{onPartitionsRevoked}} is called, all partitions have been revoked 
from that consumer, and whenever {{onPartitionsAssigned}} is called, the 
partitions passed in to that method comprise the complete set of topic 
partitions assigned to that consumer.

See the [WorkerSinkTask.HandleRebalance 
class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730]
 for the specifics.

 

One issue this can cause is silently ignoring to-be-committed offsets provided 
by sink tasks, since the framework ignores offsets provided by tasks in their 
{{preCommit}} method if it does not believe that the consumer for that task is 
currently assigned the topic partition for that offset. See these lines in the 
[WorkerSinkTask::commitOffsets 
method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430]
 for reference.

 

This may not be the only issue caused by configuring a sink connector's 
consumer to use cooperative rebalancing. Rigorous unit and integration testing 
should be added before claiming that the Connect framework supports the use of 
cooperative consumers with sink connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10292: MINOR: fix client_compatibility_features_test.py

2021-03-16 Thread GitBox


cmccabe commented on pull request #10292:
URL: https://github.com/apache/kafka/pull/10292#issuecomment-800670554


   @rondagostino : thanks for the explanation.
   
   It's a bit weird to use the devel version of the tool when ZK is enabled, 
and the old version when it's not.  I think this will lead to some odd issues 
down the road.
   
   @chia7712 : Can we just use a separate node for the client, unconditionally? 
 Then that separate node can always have the devel software.
   
   Or, I suppose, we could change the path that the compat tool is invoked via 
(maybe that's annoying, though...)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10297: MINOR: fix failing ZooKeeper system tests

2021-03-16 Thread GitBox


cmccabe commented on pull request #10297:
URL: https://github.com/apache/kafka/pull/10297#issuecomment-800668936


   Sure, we can refactor this later if it's that easier.
   
   Can you add a comment to `SecurityConfig#enabled_sasl_mechanisms` describing 
what it's supposed to return, though?  If it should return every possible sasl 
mechanism in use (zk, controller, broker, client) then let's document that



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12482) Remove deprecated Connect worker configs

2021-03-16 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302957#comment-17302957
 ] 

Randall Hauch commented on KAFKA-12482:
---

We've not yet decided whether it's worth it to remove these. See [Changing 
Defaults in Connect 3.0.0 wiki 
page|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047362]
 and discussion thread.

> Remove deprecated Connect worker configs
> 
>
> Key: KAFKA-12482
> URL: https://issues.apache.org/jira/browse/KAFKA-12482
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Priority: Critical
> Fix For: 3.0.0
>
>
> The following Connect worker configuration properties were deprecated and 
> should be removed in 3.0.0:
>  * {{rest.host.name}} (deprecated in 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
>  * {{rest.port}} (deprecated in 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
>  * {{internal.key.converter}} (deprecated in 
> [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
>  * {{internal.value.converter}} (deprecated in 
> [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302952#comment-17302952
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12472:


By the way, I just filed https://issues.apache.org/jira/browse/KAFKA-12486 
which could be relevant as it would add another possible reason for Streams to 
trigger a rebalance

> Add a Consumer / Streams metric to indicate the current rebalance status
> 
>
> Key: KAFKA-12472
> URL: https://issues.apache.org/jira/browse/KAFKA-12472
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today to trouble shoot a rebalance issue operators need to do a lot of manual 
> steps: locating the problematic members, search in the log entries, and look 
> for related metrics. It would be great to add a single metric that covers all 
> these manual steps and operators would only need to check this single signal 
> to check what is the root cause. A concrete idea is to expose two enum gauge 
> metrics on consumer and streams, respectively:
> * Consumer level (the order below is by-design, see Streams level for 
> details):
>   0. *None* => there is no rebalance on going.
>   1. *CoordinatorRequested* => any of the coordinator response contains a 
> RebalanceInProgress error code.
>   2. *NewMember* => when the join group response has a MemberIdRequired error 
> code.
>   3. *UnknownMember* => when any of the coordinator response contains an 
> UnknownMember error code, indicating this member is already kicked out of the 
> group.
>   4. *StaleMember* => when any of the coordinator response contains an 
> IllegalGeneration error code.
>   5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
> expired.
>   6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll 
> API, as well as upon calling the enforceRebalance API.
>   7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
>   8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
> changed.
>   9. *RetryOnError* => when join/syncGroup response contains a retriable 
> error which would cause the consumer to backoff and retry.
>  10. *RevocationNeeded* => requestRejoin triggered since revoked partitions 
> is not empty.
> The transition rule is that a non-zero status code can only transit to zero 
> or to a higher code, but not to a lower code (same for streams, see 
> rationales below).
> * Streams level: today a streams client can have multiple consumers. We 
> introduced some new enum states as well as aggregation rules across 
> consumers: if there's no streams-layer events as below that transits its 
> status (i.e. streams layer think it is 0), then we aggregate across all the 
> embedded consumers and take the largest status code value as the streams 
> metric; if there are streams-layer events that determines its status should 
> be in 10+, then it ignores all embedded consumer layer status code since it 
> should has a higher precedence. In addition, when create aggregated metric 
> across streams instance (a.k.a at the app-level, which is usually what we 
> would care and alert on), we also follow the same aggregation rule, e.g. if 
> there are two streams instance where one instance's status code is 1), and 
> the other is 10), then the app's status is 10).
>  10. *RevocationNeeded* => the definition of this is changed to the original 
> 10) defined in consumer above, OR leader decides to revoke either 
> active/standby tasks and hence schedule follow-ups.
>  11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
> current assignment is unstable.
>  12. *VersionProbing* => leader decides to schedule follow-ups due to version 
> probing.
>  13. *EndpointUpdate* => anyone decides to schedule follow-ups due to 
> endpoint updates.
> The main motivations of the above proposed precedence order are the following:
> 1. When a rebalance is triggered by one member, all other members would only 
> know it is due to CoordinatorRequested from coordinator error codes, and 
> hence CoordinatorRequested should be overridden by any other status when 
> aggregating across clients.
> 2. DroppedGroup could cause unknown/stale members that would fail and retry 
> immediately, and hence should take higher precedence.
> 3. Revocation definition is extended in Streams, and hence it needs to take 
> the highest precedence among all consumer-only status so that it would not be 
> overridden by any of the consumer-only status.
> 4. In general, more rare events get higher precedence.
> This is proposed on top of KAFKA-12352. Any comments on the precedence rules 
> / categorization 

[GitHub] [kafka] rondagostino commented on pull request #10297: MINOR: fix failing ZooKeeper system tests

2021-03-16 Thread GitBox


rondagostino commented on pull request #10297:
URL: https://github.com/apache/kafka/pull/10297#issuecomment-800653669


   Yeah, there never has been a clear delineation between "what SASL mechanism 
are enabled for Kafka" vs. "what SASL mechanisms are enabled for ZooKeeper".  I 
had to tease this apart for Kafka brokers vs. Kafka Raft controllers (see 
`serves_raft_sasl` and `uses_raft_sasl`).  The same kind of teasing apart could 
be done for Kafka vs. Zookeeper as well.  Perhaps we can open a ticket for this 
and leave it for another time?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-16 Thread GitBox


jolshan commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-800648300


   @chia7712 is there any work in progress for a KafkaApis.handleFetchRequest 
test? I suspect it would be similar but maybe a bit harder than what I did for 
the LeaderAndIsr version https://github.com/apache/kafka/pull/10071 (trading 
replicamanager for fetchmanager, etc). This benchmark would be helpful for 
https://github.com/apache/kafka/pull/9944 as you could probably guess :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10297: MINOR: fix failing ZooKeeper system tests

2021-03-16 Thread GitBox


cmccabe commented on pull request #10297:
URL: https://github.com/apache/kafka/pull/10297#issuecomment-800646841


   I'm struggling a bit to understand whether enabled_sasl_mechanisms is 
supposed to be for the broker or for ZK?
   
   It seems like you're treating it like it's for both, but then in a few other 
places, like admin_client_as_broker_jaas.conf, it seems to be just broker.  Or 
did I miss something here?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10292: MINOR: fix client_compatibility_features_test.py

2021-03-16 Thread GitBox


rondagostino commented on pull request #10292:
URL: https://github.com/apache/kafka/pull/10292#issuecomment-800644753


   I broke this with 
https://github.com/apache/kafka/pull/10105/files#diff-84e14a0909d232b70f0a957ded161cd077d4dc1d069bbaab8e1bacc8dd2e0572L84-R85.
  The test used to always use the ZooKeeper node, and I changed it to use the 
Kafka node not realizing that the two distribution versions would be different. 
 We want it to always use at least version 2.7, which we can get by using 
either the ZooKeeper node for the Zookeeper case or the Kafka node for the Raft 
case.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12483) Enable client overrides in connector configs by default

2021-03-16 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302933#comment-17302933
 ] 

Randall Hauch commented on KAFKA-12483:
---

The draft PR implements the changes proposed in 
[KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default].
 They should not be merged until the KIP is approved.

> Enable client overrides in connector configs by default
> ---
>
> Key: KAFKA-12483
> URL: https://issues.apache.org/jira/browse/KAFKA-12483
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Connector-specific client overrides were added in 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy],
>  but that feature is not enabled by default since it would not have been 
> backward compatible.
> But with AK 3.0.0, we have the opportunity to enable connector client 
> overrides by default by changing the worker config's 
> {{connector.client.config.override.policy}} default value to {{All}}.
> See 
> [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12484) Enable Connect's connector log contexts by default

2021-03-16 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302932#comment-17302932
 ] 

Randall Hauch commented on KAFKA-12484:
---

The draft PR implements the changes proposed in 
[KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration].
 They should not be merged until the KIP is approved.

> Enable Connect's connector log contexts by default
> --
>
> Key: KAFKA-12484
> URL: https://issues.apache.org/jira/browse/KAFKA-12484
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Connect's Log4J configuration does not by default log the connector contexts. 
> That feature was added in 
> [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
>  and first appeared in AK 2.3.0, but it was not enabled by default since that 
> would not have been backward compatible.
> But with AK 3.0.0, we have the opportunity to change the default in 
> {{config/connect-log4j.properties}} to enable connector log contexts.
> See 
> [KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch opened a new pull request #10336: KAFKA-12483: Enable client overrides in connector configs by default (KIP-722)

2021-03-16 Thread GitBox


rhauch opened a new pull request #10336:
URL: https://github.com/apache/kafka/pull/10336


   **DO NOT MERGE until after 
[KIP-722](https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default)
 has been approved!**
   
   Changes the default value for the `connector.client.config.override.policy` 
worker configuration property from `None` to `All`. Modified unit tests to 
verify all policies still work, and that by default connectors can override all 
client policies.
   
   See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12486) Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12486:
---
Priority: Critical  (was: Major)

> Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task
> 
>
> Key: KAFKA-12486
> URL: https://issues.apache.org/jira/browse/KAFKA-12486
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>
> In KIP-441, we added the HighAvailabilityTaskAssignor to address certain 
> common scenarios which tend to lead to heavy downtime for tasks, such as 
> scaling out. The new assignor will always place an active task on a client 
> which has a "caught-up" copy of that tasks' state, if any exists, while the 
> intended recipient will instead get a standby task to warm up the state in 
> the background. This way we keep tasks live as much as possible, and avoid 
> the long downtime imposed by state restoration on active tasks.
> We can actually expand on this to reduce downtime due to restoring state: 
> specifically, we may throw a TaskCorruptedException on an active task which 
> leads to wiping out the state stores of that task and restoring from scratch. 
> There are a few cases where this may be thrown:
>  # No checkpoint found with EOS
>  # TimeoutException when processing a StreamTask
>  # TimeoutException when committing offsets under eos
>  # RetriableException in RecordCollectorImpl
> (There is also the case of OffsetOutOfRangeException, but that is excluded 
> here since it only applies to standby tasks).
> We should consider triggering a rebalance when we hit TaskCorruptedException 
> on an active task so that the assignor has the chance to redirect this to 
> another client who can resume work on the task while the original owner works 
> on restoring the state from scratch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12486) Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12486:
--

 Summary: Utilize HighAvailabilityTaskAssignor to avoid downtime on 
corrupted task
 Key: KAFKA-12486
 URL: https://issues.apache.org/jira/browse/KAFKA-12486
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


In KIP-441, we added the HighAvailabilityTaskAssignor to address certain common 
scenarios which tend to lead to heavy downtime for tasks, such as scaling out. 
The new assignor will always place an active task on a client which has a 
"caught-up" copy of that tasks' state, if any exists, while the intended 
recipient will instead get a standby task to warm up the state in the 
background. This way we keep tasks live as much as possible, and avoid the long 
downtime imposed by state restoration on active tasks.

We can actually expand on this to reduce downtime due to restoring state: 
specifically, we may throw a TaskCorruptedException on an active task which 
leads to wiping out the state stores of that task and restoring from scratch. 
There are a few cases where this may be thrown:
 # No checkpoint found with EOS
 # TimeoutException when processing a StreamTask
 # TimeoutException when committing offsets under eos
 # RetriableException in RecordCollectorImpl

(There is also the case of OffsetOutOfRangeException, but that is excluded here 
since it only applies to standby tasks).

We should consider triggering a rebalance when we hit TaskCorruptedException on 
an active task so that the assignor has the chance to redirect this to another 
client who can resume work on the task while the original owner works on 
restoring the state from scratch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12483) Enable client overrides in connector configs by default

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-12483:
-

Assignee: Randall Hauch

> Enable client overrides in connector configs by default
> ---
>
> Key: KAFKA-12483
> URL: https://issues.apache.org/jira/browse/KAFKA-12483
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Connector-specific client overrides were added in 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy],
>  but that feature is not enabled by default since it would not have been 
> backward compatible.
> But with AK 3.0.0, we have the opportunity to enable connector client 
> overrides by default by changing the worker config's 
> {{connector.client.config.override.policy}} default value to {{All}}.
> See 
> [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12483) Enable client overrides in connector configs by default

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12483:
--
Description: 
Connector-specific client overrides were added in 
[KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy],
 but that feature is not enabled by default since it would not have been 
backward compatible.

But with AK 3.0.0, we have the opportunity to enable connector client overrides 
by default by changing the worker config's 
{{connector.client.config.override.policy}} default value to {{All}}.

See 
[KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default].

  was:
Connector-specific client overrides were added in 
[KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy],
 but that feature is not enabled by default since it would not have been 
backward compatible.

But with AK 3.0.0, we have the opportunity to enable connector client overrides 
by default by changing the worker config's 
{{connector.client.config.override.policy}} default value to \{{All}}.


> Enable client overrides in connector configs by default
> ---
>
> Key: KAFKA-12483
> URL: https://issues.apache.org/jira/browse/KAFKA-12483
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Connector-specific client overrides were added in 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy],
>  but that feature is not enabled by default since it would not have been 
> backward compatible.
> But with AK 3.0.0, we have the opportunity to enable connector client 
> overrides by default by changing the worker config's 
> {{connector.client.config.override.policy}} default value to {{All}}.
> See 
> [KIP-722|https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12484) Enable Connect's connector log contexts by default

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12484:
--
Description: 
Connect's Log4J configuration does not by default log the connector contexts. 
That feature was added in 
[KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
 and first appeared in AK 2.3.0, but it was not enabled by default since that 
would not have been backward compatible.

But with AK 3.0.0, we have the opportunity to change the default in 
{{config/connect-log4j.properties}} to enable connector log contexts.

See 
[KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration].

  was:
Connect's Log4J configuration does not by default log the connector contexts. 
That feature was added in 
[KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
 and first appeared in AK 2.3.0, but it was not enabled by default since that 
would not have been backward compatible.

But with AK 3.0.0, we have the opportunity to change the default in 
{{config/connect-log4j.properties}} to enable connector log contexts.


> Enable Connect's connector log contexts by default
> --
>
> Key: KAFKA-12484
> URL: https://issues.apache.org/jira/browse/KAFKA-12484
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Connect's Log4J configuration does not by default log the connector contexts. 
> That feature was added in 
> [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
>  and first appeared in AK 2.3.0, but it was not enabled by default since that 
> would not have been backward compatible.
> But with AK 3.0.0, we have the opportunity to change the default in 
> {{config/connect-log4j.properties}} to enable connector log contexts.
> See 
> [KIP-721|https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch opened a new pull request #10335: KAFKA-12484: Enable Connect's connector log contexts by default (KIP-721) DRAFT

2021-03-16 Thread GitBox


rhauch opened a new pull request #10335:
URL: https://github.com/apache/kafka/pull/10335


   **DO NOT MERGE until after 
[KIP-721](https://cwiki.apache.org/confluence/display/KAFKA/KIP-721%3A+Enable+connector+log+contexts+in+Connect+Log4j+configuration)
 has been approved!**
   
   Change the `connect-log4j.properties` file to use the connector log context 
by default.
   This feature was previously added in KIP-449, but was not enabled by default.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10334: MINOR: Fix BaseHashTable sizing

2021-03-16 Thread GitBox


ijuma commented on a change in pull request #10334:
URL: https://github.com/apache/kafka/pull/10334#discussion_r595558838



##
File path: metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java
##
@@ -56,12 +58,30 @@
 this.elements = new Object[expectedSizeToCapacity(expectedSize)];
 }
 
+/**
+ * Calculate the capacity we should provision, given the expected size.
+ *
+ * Our capacity must always be a power of 2, and never less than 2.
+ */
 static int expectedSizeToCapacity(int expectedSize) {
-if (expectedSize <= 1) {
-return 2;
+if (expectedSize >= MAX_CAPACITY / 2) {
+return MAX_CAPACITY;
+}
+return Math.max(MIN_CAPACITY, roundUpToPowerOfTwo(expectedSize * 2));
+}
+
+private static int roundUpToPowerOfTwo(int i) {
+if (i < 0) {
+return 0;
 }
-double sizeToFit = expectedSize / MAX_LOAD_FACTOR;
-return (int) Math.min(MAX_CAPACITY, Math.ceil(Math.log(sizeToFit) / 
LN_2));
+i = i - 1;
+i |= i >> 1;
+i |= i >> 2;
+i |= i >> 4;
+i |= i >> 8;
+i |= i >> 16;
+i = i + 1;
+return i < 0 ? MAX_CAPACITY : i;

Review comment:
   Can you do something like:
   
   ```java
   static final int tableSizeFor(int cap) {
   int n = -1 >>> Integer.numberOfLeadingZeros(cap - 1);
   return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yatsukav commented on a change in pull request #10333: KAFKA-12481: Add socket.nagle.disable config property

2021-03-16 Thread GitBox


yatsukav commented on a change in pull request #10333:
URL: https://github.com/apache/kafka/pull/10333#discussion_r595557313



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -391,6 +392,7 @@ object KafkaConfig {
   val AdvertisedListenersProp = "advertised.listeners"
   val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
   val ControlPlaneListenerNameProp = "control.plane.listener.name"
+  val SocketNagleDisableProp = "socket.nagle.disable"

Review comment:
   I take `socket.nagle.disable` from librdkafka project. But ok, I rename 
it to `socket.tcp.no.delay`. Perhaps this is clearer. Thank you for review.
   
   I request KIP creating permission on d...@kafka.apache.org.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yatsukav commented on a change in pull request #10333: KAFKA-12481: Add socket.nagle.disable config property

2021-03-16 Thread GitBox


yatsukav commented on a change in pull request #10333:
URL: https://github.com/apache/kafka/pull/10333#discussion_r595557313



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -391,6 +392,7 @@ object KafkaConfig {
   val AdvertisedListenersProp = "advertised.listeners"
   val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
   val ControlPlaneListenerNameProp = "control.plane.listener.name"
+  val SocketNagleDisableProp = "socket.nagle.disable"

Review comment:
   I take `socket.nagle.disable` from librdkafka project. But ok, I rename 
it to `socket.tcp.no.delay`. Perhaps this is clearer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10334: MINOR: Fix BaseHashTable sizing

2021-03-16 Thread GitBox


cmccabe opened a new pull request #10334:
URL: https://github.com/apache/kafka/pull/10334


   The array backing BaseHashTable is intended to be sized as a power of
   two.  Due to a bug, the initial array size was calculated incorrectly
   in some cases.
   
   Also make the maximum array size the largest possible 31-bit power of
   two.  Previously it was a smaller size, but this was due to a typo.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302889#comment-17302889
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12472:


{quote} illegal generation id is usually due to a new member with the same id 
replaced an old member and then later the old zombie member tries to talk to 
coordinator (though with the new model, this should mostly be for static 
members).
{quote}
Makes sense – but just wondering, what are you referring to here by "new model"
{quote}Re. 3: My reasoning is that if a new member joins the group and triggers 
the rebalance, then from observability point of view we would like to know it's 
due to "NewMember", not "CoordinatorRequested", since the latter is actually 
not the "root" cause. This applies to both consumer and streams.
{quote}
The motivation here sounds good, as the "CoordinatorRequested" state doesn't 
really tell you why it's rebalancing – but it does seem to potentially violate 
the transition rule that "a non-zero status code can only transit to zero or to 
a higher code". What if you have a new member join and then some other consumer 
in another client triggers a rebalance, causing the new member to get the 
RebalanceInProgressException before it's able to complete the first rebalance. 
Shouldn't it then transition from *NewMember* --> *CoordinatorRequested* ? But 
this transition is disallowed under this rule.

Similarly, it seems like a single StreamThread could go from any of the 
Streams-specific states to almost any of the consumer states, eg 
*AssignmentProbing -->* *DroppedGroup*. I guess I'm wondering why do we need 
this transition rule in the first place?

> Add a Consumer / Streams metric to indicate the current rebalance status
> 
>
> Key: KAFKA-12472
> URL: https://issues.apache.org/jira/browse/KAFKA-12472
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today to trouble shoot a rebalance issue operators need to do a lot of manual 
> steps: locating the problematic members, search in the log entries, and look 
> for related metrics. It would be great to add a single metric that covers all 
> these manual steps and operators would only need to check this single signal 
> to check what is the root cause. A concrete idea is to expose two enum gauge 
> metrics on consumer and streams, respectively:
> * Consumer level (the order below is by-design, see Streams level for 
> details):
>   0. *None* => there is no rebalance on going.
>   1. *CoordinatorRequested* => any of the coordinator response contains a 
> RebalanceInProgress error code.
>   2. *NewMember* => when the join group response has a MemberIdRequired error 
> code.
>   3. *UnknownMember* => when any of the coordinator response contains an 
> UnknownMember error code, indicating this member is already kicked out of the 
> group.
>   4. *StaleMember* => when any of the coordinator response contains an 
> IllegalGeneration error code.
>   5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
> expired.
>   6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll 
> API, as well as upon calling the enforceRebalance API.
>   7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
>   8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
> changed.
>   9. *RetryOnError* => when join/syncGroup response contains a retriable 
> error which would cause the consumer to backoff and retry.
>  10. *RevocationNeeded* => requestRejoin triggered since revoked partitions 
> is not empty.
> The transition rule is that a non-zero status code can only transit to zero 
> or to a higher code, but not to a lower code (same for streams, see 
> rationales below).
> * Streams level: today a streams client can have multiple consumers. We 
> introduced some new enum states as well as aggregation rules across 
> consumers: if there's no streams-layer events as below that transits its 
> status (i.e. streams layer think it is 0), then we aggregate across all the 
> embedded consumers and take the largest status code value as the streams 
> metric; if there are streams-layer events that determines its status should 
> be in 10+, then it ignores all embedded consumer layer status code since it 
> should has a higher precedence. In addition, when create aggregated metric 
> across streams instance (a.k.a at the app-level, which is usually what we 
> would care and alert on), we also follow the same aggregation rule, e.g. if 
> there are two streams instance where one instance's status code is 1), and 
> the other is 10), then the app's status is 10).
>  10. 

[GitHub] [kafka] ijuma merged pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft

2021-03-16 Thread GitBox


ijuma merged pull request #10322:
URL: https://github.com/apache/kafka/pull/10322


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10330: MINOR: Add toString to various Kafka Metrics classes

2021-03-16 Thread GitBox


ijuma merged pull request #10330:
URL: https://github.com/apache/kafka/pull/10330


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10330: MINOR: Add toString to various Kafka Metrics classes

2021-03-16 Thread GitBox


ijuma commented on pull request #10330:
URL: https://github.com/apache/kafka/pull/10330#issuecomment-800596069


   Failures are unrelated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12484) Enable Connect's connector log contexts by default

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-12484:
-

Assignee: Randall Hauch

> Enable Connect's connector log contexts by default
> --
>
> Key: KAFKA-12484
> URL: https://issues.apache.org/jira/browse/KAFKA-12484
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Connect's Log4J configuration does not by default log the connector contexts. 
> That feature was added in 
> [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
>  and first appeared in AK 2.3.0, but it was not enabled by default since that 
> would not have been backward compatible.
> But with AK 3.0.0, we have the opportunity to change the default in 
> {{config/connect-log4j.properties}} to enable connector log contexts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-03-16 Thread GitBox


kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r595493561



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteState.java
##
@@ -83,4 +85,25 @@ public static RemotePartitionDeleteState forId(byte id) {
 return STATE_TYPES.get(id);
 }
 
+public static boolean isValidTransition(RemotePartitionDeleteState 
srcState,

Review comment:
   I have the same suggestions from `RemoteLogSegmentState` for this as 
well. Please refer to this comment: 
https://github.com/apache/kafka/pull/10218#discussion_r595492577





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10333: KAFKA-12481: Add socket.nagle.disable config property

2021-03-16 Thread GitBox


ijuma commented on a change in pull request #10333:
URL: https://github.com/apache/kafka/pull/10333#discussion_r595513484



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -391,6 +392,7 @@ object KafkaConfig {
   val AdvertisedListenersProp = "advertised.listeners"
   val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
   val ControlPlaneListenerNameProp = "control.plane.listener.name"
+  val SocketNagleDisableProp = "socket.nagle.disable"

Review comment:
   I think I would call this `socket.tcp.no.delay`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-03-16 Thread GitBox


kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r595492577



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##
@@ -87,4 +89,27 @@ public byte id() {
 public static RemoteLogSegmentState forId(byte id) {
 return STATE_TYPES.get(id);
 }
+
+public static boolean isValidTransition(RemoteLogSegmentState srcState, 
RemoteLogSegmentState targetState) {
+Objects.requireNonNull(targetState, "targetState can not be null");
+
+if (srcState == null) {
+// If the source state is null, check the target state as the 
initial state viz DELETE_PARTITION_MARKED
+// Wanted to keep this logic simple here by taking null for 
srcState, instead of creating one more state like
+// COPY_SEGMENT_NOT_STARTED and have the null check by caller and 
pass that state.
+return targetState == COPY_SEGMENT_STARTED;
+} else if (srcState == targetState) {

Review comment:
   1. Will it be useful to place the implementation of this validation in a 
separate module, so that it can be reused with `RLMMWithTopicStorage` in the 
future?
   2. Suggestion from the standpoint of code readability: Would it make sense 
to replace the `if-else` logic by looking up from a `Map< 
RemoteLogSegmentState, Set< RemoteLogSegmentState>>` where key is the source 
state and value is a set of allowed target states?
   
   

##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed 
by inmemory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements 
RemoteLogMetadataManager {

Review comment:
   We may want to think more about the locking semantics for this class and 
`RemoteLogMetadataCache`. 
   Are we sure there would _not_ be use cases where we need to serialize 
mutations across the individually thread-safe attributes? If the answer is no, 
then using a fine-grained `Object` lock makes more sense because we can use it 
to guard critical sections.
   
   Should we evaluate this upfront?
   
   cc @junrao 

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteState.java
##
@@ -83,4 +85,25 @@ public static RemotePartitionDeleteState forId(byte id) {
 return STATE_TYPES.get(id);
 }
 
+public static boolean isValidTransition(RemotePartitionDeleteState 
srcState,

Review comment:
   I have the same suggestions from `RemoteLogSegmentState` for this as 
well. Please refer to this comment:

##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the 

[GitHub] [kafka] ijuma commented on pull request #10333: KAFKA-12481: Add socket.nagle.disable config property

2021-03-16 Thread GitBox


ijuma commented on pull request #10333:
URL: https://github.com/apache/kafka/pull/10333#issuecomment-800568236


   Nice find. Since this introduces a new config, it would require a KIP:
   https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12485:
---
Labels: newbie++  (was: )

> Speed up Consumer#committed by returning cached offsets for owned partitions
> 
>
> Key: KAFKA-12485
> URL: https://issues.apache.org/jira/browse/KAFKA-12485
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie++
>
> All of the KafkaConsumer#committed APIs will currently make a remote blocking 
> call to the server to fetch the committed offsets. This is typically used to 
> reset the offsets after a crash or restart, or to fetch offsets for other 
> consumers in the group. However some users may wish to invoke this API on 
> partitions which are currently owned by the Consumer, in which case the 
> remote call is unnecessary since those offsets should already be known.
> We should consider optimizing these APIs to just return the cached offsets in 
> place of the remote call when passed in only partitions that are currently 
> owned. This is similar to what we do in Consumer#position, although there we 
> have a guarantee that the partitions are owned by the Consumer whereas in 
> #committed we do not



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12485:
--

 Summary: Speed up Consumer#committed by returning cached offsets 
for owned partitions
 Key: KAFKA-12485
 URL: https://issues.apache.org/jira/browse/KAFKA-12485
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: A. Sophie Blee-Goldman


All of the KafkaConsumer#committed APIs will currently make a remote blocking 
call to the server to fetch the committed offsets. This is typically used to 
reset the offsets after a crash or restart, or to fetch offsets for other 
consumers in the group. However some users may wish to invoke this API on 
partitions which are currently owned by the Consumer, in which case the remote 
call is unnecessary since those offsets should already be known.

We should consider optimizing these APIs to just return the cached offsets in 
place of the remote call when passed in only partitions that are currently 
owned. This is similar to what we do in Consumer#position, although there we 
have a guarantee that the partitions are owned by the Consumer whereas in 
#committed we do not



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12482) Remove deprecated Connect worker configs

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12482:
--
Description: 
The following Connect worker configuration properties were deprecated and 
should be removed in 3.0.0:
 * {{rest.host.name}} (deprecated in 
[KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])

 * {{rest.port}} (deprecated in 
[KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
 * {{internal.key.converter}} (deprecated in 
[KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
 * {{internal.value.converter}} (deprecated in 
[KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])

  was:
The following Connect worker configuration properties were deprecated and 
should be removed in 3.0.0:
 * {{rest.host.name}} (deprecated in 
[KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])

 * {{rest.port}} (deprecated in 
[KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
 * {{internal.key.converter}} (deprecated in 
[KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
 * {{internal.value.converter}} (deprecated in 
[KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
 * sd
 *


> Remove deprecated Connect worker configs
> 
>
> Key: KAFKA-12482
> URL: https://issues.apache.org/jira/browse/KAFKA-12482
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Priority: Critical
> Fix For: 3.0.0
>
>
> The following Connect worker configuration properties were deprecated and 
> should be removed in 3.0.0:
>  * {{rest.host.name}} (deprecated in 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
>  * {{rest.port}} (deprecated in 
> [KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
>  * {{internal.key.converter}} (deprecated in 
> [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
>  * {{internal.value.converter}} (deprecated in 
> [KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12484) Enable Connect's connector log contexts by default

2021-03-16 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-12484:
-

 Summary: Enable Connect's connector log contexts by default
 Key: KAFKA-12484
 URL: https://issues.apache.org/jira/browse/KAFKA-12484
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Randall Hauch
 Fix For: 3.0.0


Connect's Log4J configuration does not by default log the connector contexts. 
That feature was added in 
[KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
 and first appeared in AK 2.3.0, but it was not enabled by default since that 
would not have been backward compatible.

But with AK 3.0.0, we have the opportunity to change the default in 
{{config/connect-log4j.properties}} to enable connector log contexts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12482) Remove deprecated Connect worker configs

2021-03-16 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-12482:
-

 Summary: Remove deprecated Connect worker configs
 Key: KAFKA-12482
 URL: https://issues.apache.org/jira/browse/KAFKA-12482
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Randall Hauch
 Fix For: 3.0.0


The following Connect worker configuration properties were deprecated and 
should be removed in 3.0.0:
 * {{rest.host.name}} (deprecated in 
[KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])

 * {{rest.port}} (deprecated in 
[KIP-208|https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface])
 * {{internal.key.converter}} (deprecated in 
[KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
 * {{internal.value.converter}} (deprecated in 
[KIP-174|https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig])
 * sd
 *



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12483) Enable client overrides in connector configs by default

2021-03-16 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-12483:
-

 Summary: Enable client overrides in connector configs by default
 Key: KAFKA-12483
 URL: https://issues.apache.org/jira/browse/KAFKA-12483
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Randall Hauch
 Fix For: 3.0.0


Connector-specific client overrides were added in 
[KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy],
 but that feature is not enabled by default since it would not have been 
backward compatible.

But with AK 3.0.0, we have the opportunity to enable connector client overrides 
by default by changing the worker config's 
{{connector.client.config.override.policy}} default value to \{{All}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


jsancio commented on a change in pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#discussion_r595483066



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -44,33 +49,126 @@
 public class TimelineHashMapBenchmark {
 private final static int NUM_ENTRIES = 1_000_000;
 
+@State(Scope.Thread)
+public static class HashMapInput {
+public HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new HashMap<>(keys.size());
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class ImmutableMapInput {
+scala.collection.immutable.HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new scala.collection.immutable.HashMap<>();
+for (Integer key : keys) {
+map = map.updated(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapSnapshotInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+int count = 0;
+for (Integer key : keys) {
+if (count % 1_000 == 0) {
+snapshotRegistry.deleteSnapshotsUpTo(count - 10_000);
+snapshotRegistry.createSnapshot(count);
+}
+map.put(key, String.valueOf(key));
+count++;
+}
+
+Collections.shuffle(keys);
+}
+}
+
+
 @Benchmark
 public Map testAddEntriesInHashMap() {
-HashMap map = new HashMap<>(NUM_ENTRIES);
+HashMap map = new HashMap<>();
 for (int i = 0; i < NUM_ENTRIES; i++) {
 int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
 map.put(key, String.valueOf(key));
 }
+
+return map;
+}
+
+@Benchmark
+public scala.collection.immutable.HashMap 
testAddEntriesInImmutableMap() {
+scala.collection.immutable.HashMap map = new 
scala.collection.immutable.HashMap<>();
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
+map = map.updated(key, String.valueOf(key));

Review comment:
   Good catch. I looks like we were mostly measuring converting an int to a 
String!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-03-16 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-12330.

Fix Version/s: 3.0.0
   Resolution: Fixed

> FetchSessionCache may cause starvation for partitions when FetchResponse is 
> full
> 
>
> Key: KAFKA-12330
> URL: https://issues.apache.org/jira/browse/KAFKA-12330
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Bradstreet
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.0.0
>
>
> The incremental FetchSessionCache sessions deprioritizes partitions where a 
> response is returned. This may happen if log metadata such as log start 
> offset, hwm, etc is returned, or if data for that partition is returned.
> When a fetch response fills to maxBytes, data may not be returned for 
> partitions even if the fetch offset is lower than the fetch upper bound. 
> However, the fetch response will still contain updates to metadata such as 
> hwm if that metadata has changed. This can lead to degenerate behavior where 
> a partition's hwm or log start offset is updated resulting in the next fetch 
> being unnecessarily skipped for that partition. At first this appeared to be 
> worse, as hwm updates occur frequently, but starvation should result in hwm 
> movement becoming blocked, allowing a fetch to go through and then becoming 
> unstuck. However, it'll still require one more fetch request than necessary 
> to do so. Consumers may be affected more than replica fetchers, however they 
> often remove partitions with fetched data from the next fetch request and 
> this may be helping prevent starvation.
> I believe we should only reorder the partition fetch priority if data is 
> actually returned for a partition.
> {noformat}
> private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
> val updateFetchContextAndRemoveUnselected: 
> Boolean)
>   extends FetchSession.RESP_MAP_ITER {
>   var nextElement: util.Map.Entry[TopicPartition, 
> FetchResponse.PartitionData[Records]] = null
>   override def hasNext: Boolean = {
> while ((nextElement == null) && iter.hasNext) {
>   val element = iter.next()
>   val topicPart = element.getKey
>   val respData = element.getValue
>   val cachedPart = session.partitionMap.find(new 
> CachedPartition(topicPart))
>   val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
> updateFetchContextAndRemoveUnselected)
>   if (mustRespond) {
> nextElement = element
> // Example POC change:
> // Don't move partition to end of queue if we didn't actually fetch 
> data
> // This should help avoid starvation even when we are filling the 
> fetch response fully while returning metadata for these partitions
> if (updateFetchContextAndRemoveUnselected && respData.records != null 
> && respData.records.sizeInBytes > 0) {
>   session.partitionMap.remove(cachedPart)
>   session.partitionMap.mustAdd(cachedPart)
> }
>   } else {
> if (updateFetchContextAndRemoveUnselected) {
>   iter.remove()
> }
>   }
> }
> nextElement != null
>   }{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-03-16 Thread GitBox


rajinisivaram merged pull request #10318:
URL: https://github.com/apache/kafka/pull/10318


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-03-16 Thread GitBox


rajinisivaram commented on pull request #10318:
URL: https://github.com/apache/kafka/pull/10318#issuecomment-800540196


   @dajac Thanks for the PR, LGTM. Merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


ijuma commented on pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#issuecomment-800538930


   Btw, you can find openjdk benchmarks here: 
https://github.com/openjdk/jdk/tree/master/test/micro/org/openjdk/bench/java/util



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12427) Broker does not close muted idle connections with buffered data

2021-03-16 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-12427.

Fix Version/s: 3.0.0
 Reviewer: Rajini Sivaram
   Resolution: Fixed

> Broker does not close muted idle connections with buffered data
> ---
>
> Key: KAFKA-12427
> URL: https://issues.apache.org/jira/browse/KAFKA-12427
> Project: Kafka
>  Issue Type: Bug
>  Components: core, network
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #10267: KAFKA-12427: Don't update connection idle time for muted connections

2021-03-16 Thread GitBox


rajinisivaram merged pull request #10267:
URL: https://github.com/apache/kafka/pull/10267


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yatsukav opened a new pull request #10333: KAFKA-12481: Add socket.nagle.disable config property

2021-03-16 Thread GitBox


yatsukav opened a new pull request #10333:
URL: https://github.com/apache/kafka/pull/10333


   A large number of topic-partitions on one broker causes burst of host's 
packets/sec metric. This pull request allow disable TCP_NODELAY socket option 
via Kafka Config.
   
   Big amount topic-partitions per broker raise enormous packets count. For 
example 30k topic-partitions under load per 4 broker spawn ~150k packets/sec. 
With disabled TCP_NODELAY this value reduced to ~3k packets/sec.
   More about how to reproduce problem and result after solving in JIRA: 
https://issues.apache.org/jira/browse/KAFKA-12481
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


ijuma commented on a change in pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#discussion_r595472914



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -44,33 +49,126 @@
 public class TimelineHashMapBenchmark {
 private final static int NUM_ENTRIES = 1_000_000;
 
+@State(Scope.Thread)
+public static class HashMapInput {
+public HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new HashMap<>(keys.size());
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class ImmutableMapInput {
+scala.collection.immutable.HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new scala.collection.immutable.HashMap<>();
+for (Integer key : keys) {
+map = map.updated(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapSnapshotInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+int count = 0;
+for (Integer key : keys) {
+if (count % 1_000 == 0) {
+snapshotRegistry.deleteSnapshotsUpTo(count - 10_000);
+snapshotRegistry.createSnapshot(count);
+}
+map.put(key, String.valueOf(key));
+count++;
+}
+
+Collections.shuffle(keys);
+}
+}
+
+
 @Benchmark
 public Map testAddEntriesInHashMap() {
-HashMap map = new HashMap<>(NUM_ENTRIES);
+HashMap map = new HashMap<>();
 for (int i = 0; i < NUM_ENTRIES; i++) {
 int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
 map.put(key, String.valueOf(key));
 }
+
+return map;
+}
+
+@Benchmark
+public scala.collection.immutable.HashMap 
testAddEntriesInImmutableMap() {
+scala.collection.immutable.HashMap map = new 
scala.collection.immutable.HashMap<>();
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
+map = map.updated(key, String.valueOf(key));
+}
+
 return map;
 }
 
 @Benchmark
 public Map testAddEntriesInTimelineMap() {
 SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-TimelineHashMap map =
-new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+TimelineHashMap map = new 
TimelineHashMap<>(snapshotRegistry, 16);
 for (int i = 0; i < NUM_ENTRIES; i++) {
 int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));

Review comment:
   Hmm, I'd just generate the randoms during set-up and add them to an 
array.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests

2021-03-16 Thread GitBox


ijuma edited a comment on pull request #10323:
URL: https://github.com/apache/kafka/pull/10323#issuecomment-800535044


   Thanks, this is very helpful. Since this is EPL 2, we need to check 
https://apache.org/legal/resolved.html#weak-copyleft-licenses



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests

2021-03-16 Thread GitBox


ijuma commented on pull request #10323:
URL: https://github.com/apache/kafka/pull/10323#issuecomment-800535044


   Since this is EPL 2, we need to check 
https://apache.org/legal/resolved.html#weak-copyleft-licenses



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12477) Smart rebalancing with dynamic protocol selection

2021-03-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17302833#comment-17302833
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12477:


Yes, I totally agree – the second rolling bounce, while not critical, still 
serves as a safety net. However I think we can still implement some kind of 
safety measures in its place, while still encouraging those users who can 
afford to perform the second rolling bounce to continue doing so. For example 
we could enforce that the selected protocol can only ever increase, and throw a 
fatal exception if the consumer has been using the COOPERATIVE protocol and 
suddenly they receive the RangeAssignor from a rebalance which only supports 
EAGER.

> Smart rebalancing with dynamic protocol selection
> -
>
> Key: KAFKA-12477
> URL: https://issues.apache.org/jira/browse/KAFKA-12477
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.0.0
>
>
> Users who want to upgrade their applications and enable the COOPERATIVE 
> rebalancing protocol in their consumer apps are required to follow a double 
> rolling bounce upgrade path. The reason for this is laid out in the [Consumer 
> Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]
>  section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing 
> protocol in its constructor based on the list of supported partition 
> assignors. The protocol is selected as the highest protocol that is commonly 
> supported by all assignors in the list, and never changes after that.
> This is a bit unfortunate because it may end up using an older protocol even 
> after every member in the group has been updated to support the newer 
> protocol. After the first rolling bounce of the upgrade, all members will 
> have two assignors: "cooperative-sticky" and "range" (or 
> sticky/round-robin/etc). At this point the EAGER protocol will still be 
> selected due to the presence of the "range" assignor, but it's the 
> "cooperative-sticky" assignor that will ultimately be selected for use in 
> rebalances if that assignor is preferred (ie positioned first in the list). 
> The only reason for the second rolling bounce is to strip off the "range" 
> assignor and allow the upgraded members to switch over to COOPERATIVE. We 
> can't allow them to use cooperative rebalancing until everyone has been 
> upgraded, but once they have it's safe to do so.
> And there is already a way for the client to detect that everyone is on the 
> new byte code: if the CooperativeStickyAssignor is selected by the group 
> coordinator, then that means it is supported by all consumers in the group 
> and therefore everyone must be upgraded. 
> We may be able to save the second rolling bounce by dynamically updating the 
> rebalancing protocol inside the ConsumerCoordinator as "the highest protocol 
> supported by the assignor chosen by the group coordinator". This means we'll 
> still be using EAGER at the first rebalance, since we of course need to wait 
> for this initial rebalance to get the response from the group coordinator. 
> But we should take the hint from the chosen assignor rather than dropping 
> this information on the floor and sticking with the original protocol



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-16 Thread GitBox


jsancio commented on a change in pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#discussion_r595464237



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -44,33 +49,126 @@
 public class TimelineHashMapBenchmark {
 private final static int NUM_ENTRIES = 1_000_000;
 
+@State(Scope.Thread)
+public static class HashMapInput {
+public HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new HashMap<>(keys.size());
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class ImmutableMapInput {
+scala.collection.immutable.HashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+map = new scala.collection.immutable.HashMap<>();
+for (Integer key : keys) {
+map = map.updated(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+Collections.shuffle(keys);
+}
+}
+
+@State(Scope.Thread)
+public static class TimelineMapSnapshotInput {
+public SnapshotRegistry snapshotRegistry;
+public TimelineHashMap map;
+public final List keys = createKeys(NUM_ENTRIES);
+
+@Setup(Level.Invocation)
+public void setup() {
+snapshotRegistry = new SnapshotRegistry(new LogContext());
+map = new TimelineHashMap<>(snapshotRegistry, keys.size());
+
+for (Integer key : keys) {
+map.put(key, String.valueOf(key));
+}
+
+int count = 0;
+for (Integer key : keys) {
+if (count % 1_000 == 0) {
+snapshotRegistry.deleteSnapshotsUpTo(count - 10_000);
+snapshotRegistry.createSnapshot(count);
+}
+map.put(key, String.valueOf(key));
+count++;
+}
+
+Collections.shuffle(keys);
+}
+}
+
+
 @Benchmark
 public Map testAddEntriesInHashMap() {
-HashMap map = new HashMap<>(NUM_ENTRIES);
+HashMap map = new HashMap<>();
 for (int i = 0; i < NUM_ENTRIES; i++) {
 int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
 map.put(key, String.valueOf(key));
 }
+
+return map;
+}
+
+@Benchmark
+public scala.collection.immutable.HashMap 
testAddEntriesInImmutableMap() {
+scala.collection.immutable.HashMap map = new 
scala.collection.immutable.HashMap<>();
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));
+map = map.updated(key, String.valueOf(key));
+}
+
 return map;
 }
 
 @Benchmark
 public Map testAddEntriesInTimelineMap() {
 SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-TimelineHashMap map =
-new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+TimelineHashMap map = new 
TimelineHashMap<>(snapshotRegistry, 16);
 for (int i = 0; i < NUM_ENTRIES; i++) {
 int key = (int) (0x & ((i * 2862933555777941757L) + 
3037000493L));

Review comment:
   I think this is an algorithm for generating pseudo random number. I 
think it relates to https://nuclear.llnl.gov/CNP/rng/rngman/node4.html.
   
   If this is true, let me fix the expression as it is supposed to multiply by 
`key` not `i`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-16 Thread GitBox


rondagostino commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r595456197



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits -- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.

Review comment:
   > it may not even be possible to upgrade your self-managed clusters from 
2.8 to 3.0 without downtime.
   
   I think this statement as currently worded implies that we are committing to 
supporting an upgrade (potentially with some downtime).  We should drop the 
qualifier "without downtime" at the end if we are not committing to that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10321: HOTFIX: timeout issue in Remove thread

2021-03-16 Thread GitBox


ableegoldman commented on pull request #10321:
URL: https://github.com/apache/kafka/pull/10321#issuecomment-800516409


   Cherrypicked to 2.8 cc @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 opened a new pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

2021-03-16 Thread GitBox


tang7526 opened a new pull request #10332:
URL: https://github.com/apache/kafka/pull/10332


   issue : https://issues.apache.org/jira/browse/KAFKA-10697
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10697) Remove ProduceResponse.responses

2021-03-16 Thread Chun-Hao Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chun-Hao Tang reassigned KAFKA-10697:
-

Assignee: Chun-Hao Tang  (was: UnityLung)

> Remove ProduceResponse.responses
> 
>
> Key: KAFKA-10697
> URL: https://issues.apache.org/jira/browse/KAFKA-10697
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chun-Hao Tang
>Priority: Minor
>
> This is a follow-up of KAFKA-9628.
> related discussion: 
> https://github.com/apache/kafka/pull/9401#discussion_r518984349



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #10321: HOTFIX: timeout issue in Remove thread

2021-03-16 Thread GitBox


vvcephei commented on pull request #10321:
URL: https://github.com/apache/kafka/pull/10321#issuecomment-800501385


   Thanks @wcarlson5. Yes, please cherry-pick it to 2.8.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12481) Add socket.nagle.disable config to reduce number of packets

2021-03-16 Thread Andrei Iatsuk (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrei Iatsuk updated KAFKA-12481:
--
Description: 
*What to do?*

Add _socket.nagle.disable_ parameter to Apache Kafka config like in 
[librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md].

*What reason of this improvement?*

A large number of topic-partitions on one broker causes burst of host's 
packets/sec metric. The traffic shaper in the cloud ceases to cope with such a 
load and causes service degradation.

*How to reproduce?*
# Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120.
# Add 100 topics with 100 partitions each and replication factor = 3. It is 30k 
topic-partitions in total. Amount of packet/sec is ~15k.
{code:java}
import os
for i in range(100):
  print(f"create topic 'flower{i}'... ", end="")
  cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions 
{} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", 
f"flower{i}", 100, 3)
  code = os.system(cmd)
  print("ok" if code == 0 else "error")
{code}
!Screenshot 2021-03-16 at 21.05.03.png!
# Generate server load by launching next script in 4 terminals. Amount of 
packet/sec is ~130k.
{code:java}
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092")
while True:
  for i in range(100):
print(f"sent message to 'flower{i}'")
with client.topics[f"flower{i}"].get_sync_producer() as producer:
  for j in range(1000):
producer.produce(str.encode(f'test message {j} in topic flower{i}' * 
10))
{code}
!Screenshot 2021-03-13 at 00.44.43.png! 
 !Screenshot 2021-03-13 at 00.29.10.png!
# Make dump of tcp connections via tcpdump due ~2 sec:
{code:java}
$ tcpdump -i eth1 -w dump.pcap
tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144 
bytes
^C8873886 packets captured
9139050 packets received by filter
265028 packets dropped by kernel
{code}
# Load dump to Wireshark and see statistics: ~99.999% of packets is inter 
broker messages, size of packets 40-160 bytes. On screen hosts with IPs 
10.16.23.[157-160] is brokers:
 !Screenshot 2021-03-14 at 01.46.00.png! 
 !Screenshot 2021-03-14 at 01.52.01.png!

*How to fix?*
# Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and 
provide value to kafka.network.Acceptor.accept(key) method in : 
[https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646]
# For disabled TCP_NODELAY value:
## ~400 packets/s for idle broker (instead ~12k packets/s)
## ~3k packets/s for loaded broker (instead ~150k packets/s)
 !Screenshot 2021-03-16 at 21.12.17.png!

  was:
*What to do?*

Add _socket.nagle.disable_ parameter to Apache Kafka config like in 
[librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md].

*What reason of this improvement?*

A large number of topic-partitions on one broker causes burst of host's 
packets/sec metric. The traffic shaper in the cloud ceases to cope with such a 
load and causes service degradation.

*How to reproduce?*
 # Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120.
 # Add 100 topics with 100 partitions each and replication factor = 3. It is 
30k topic-partitions in total. Amount of packet/sec is ~15k.
{code:java}
import os
for i in range(100):
  print(f"create topic 'flower{i}'... ", end="")
  cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions 
{} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", 
f"flower{i}", 100, 3)
  code = os.system(cmd)
  print("ok" if code == 0 else "error")
{code}
!Screenshot 2021-03-16 at 21.05.03.png!

 # Generate server load by launching next script in 4 terminals. Amount of 
packet/sec is ~130k.
{code:java}
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092")
while True:
  for i in range(100):
print(f"sent message to 'flower{i}'")
with client.topics[f"flower{i}"].get_sync_producer() as producer:
  for j in range(1000):
producer.produce(str.encode(f'test message {j} in topic flower{i}' * 
10))
{code}
!Screenshot 2021-03-13 at 00.44.43.png! 
 !Screenshot 2021-03-13 at 00.29.10.png!

 # Make dump of tcp connections via tcpdump due ~2 sec:
{code:java}
$ tcpdump -i eth1 -w dump.pcap
tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144 
bytes
^C8873886 packets captured
9139050 packets received by filter
265028 packets dropped by kernel
{code}

 # Load dump to Wireshark and see statistics: ~99.999% of packets is inter 
broker messages, size of packets 40-160 bytes. On screen hosts with IPs 
10.16.23.[157-160] is brokers:
 !Screenshot 2021-03-14 at 01.46.00.png! 
 !Screenshot 2021-03-14 at 01.52.01.png!

*How to fix?*
 # Add boolean _socket.nagle.disable_ parameter to Apache Kafka 

[jira] [Created] (KAFKA-12481) Add socket.nagle.disable config to reduce number of packets

2021-03-16 Thread Andrei Iatsuk (Jira)
Andrei Iatsuk created KAFKA-12481:
-

 Summary: Add socket.nagle.disable config to reduce number of 
packets 
 Key: KAFKA-12481
 URL: https://issues.apache.org/jira/browse/KAFKA-12481
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.6.1, 2.7.0, 2.5.1, 2.4.1, 2.3.1, 2.2.2, 2.1.1, 2.0.1, 
1.1.1, 1.0.2, 0.11.0.3, 0.10.2.2, 0.8.2.2
Reporter: Andrei Iatsuk
 Attachments: Screenshot 2021-03-13 at 00.29.10.png, Screenshot 
2021-03-13 at 00.44.43.png, Screenshot 2021-03-14 at 01.46.00.png, Screenshot 
2021-03-14 at 01.52.01.png, Screenshot 2021-03-16 at 21.05.03.png, Screenshot 
2021-03-16 at 21.12.17.png

*What to do?*

Add _socket.nagle.disable_ parameter to Apache Kafka config like in 
[librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md].

*What reason of this improvement?*

A large number of topic-partitions on one broker causes burst of host's 
packets/sec metric. The traffic shaper in the cloud ceases to cope with such a 
load and causes service degradation.

*How to reproduce?*
 # Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120.
 # Add 100 topics with 100 partitions each and replication factor = 3. It is 
30k topic-partitions in total. Amount of packet/sec is ~15k.
{code:java}
import os
for i in range(100):
  print(f"create topic 'flower{i}'... ", end="")
  cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions 
{} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", 
f"flower{i}", 100, 3)
  code = os.system(cmd)
  print("ok" if code == 0 else "error")
{code}
!Screenshot 2021-03-16 at 21.05.03.png!

 # Generate server load by launching next script in 4 terminals. Amount of 
packet/sec is ~130k.
{code:java}
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092")
while True:
  for i in range(100):
print(f"sent message to 'flower{i}'")
with client.topics[f"flower{i}"].get_sync_producer() as producer:
  for j in range(1000):
producer.produce(str.encode(f'test message {j} in topic flower{i}' * 
10))
{code}
!Screenshot 2021-03-13 at 00.44.43.png! 
 !Screenshot 2021-03-13 at 00.29.10.png!

 # Make dump of tcp connections via tcpdump due ~2 sec:
{code:java}
$ tcpdump -i eth1 -w dump.pcap
tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144 
bytes
^C8873886 packets captured
9139050 packets received by filter
265028 packets dropped by kernel
{code}

 # Load dump to Wireshark and see statistics: ~99.999% of packets is inter 
broker messages, size of packets 40-160 bytes. On screen hosts with IPs 
10.16.23.[157-160] is brokers:
 !Screenshot 2021-03-14 at 01.46.00.png! 
 !Screenshot 2021-03-14 at 01.52.01.png!

*How to fix?*
 # Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and 
provide value to kafka.network.Acceptor.accept(key) method in : 
[https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646]
 # For disabled TCP_NODELAY value:
 ## ~400 packets/s for idle broker (instead ~12k packets/s)
 ## ~3k packets/s for loaded broker (instead ~150k packets/s)
 !Screenshot 2021-03-16 at 21.12.17.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests

2021-03-16 Thread GitBox


stanislavkozlovski commented on pull request #10323:
URL: https://github.com/apache/kafka/pull/10323#issuecomment-800488343


   For what it's worth I evaluated jqwik vs Quickcheck and had the following 
bullet-point summaries:
   
   ### Quickcheck
   - 819 stars
   - 10-year old library
   - v1.0 released Nov 22, 2020
   - 25 open issues
   - 55 commits in the last year
   - MIT License
   - light documentation
   - supports shrinking
   
   ### Jqwik
   - 260 stars
   - 5-year old library
   - v1.5 release Feb 2021
   - 16 open issues
   - 1039 commits in the last year
   - EPL-2.0 License
   - extensive documentation
   - very configurable
   - supports shrinking
   
   My reasoning to prefer Jqwik was that it seemed more actively maintained, 
had good interfaces, had very extensive documentation (I value this heavily) 
and most importantly supports programmatic parameter generation, meaning it 
allows you to easily express the dependencies of randomized input. I got the 
notion that this random input dependency generation is one of the trickier 
things when writing more complex test cases from [this blog 
post](https://www.leadingagile.com/2018/04/step-by-step-toward-property-based-testing/).
   
   Jqwik has some other interesting features like [collecting and reporting 
statstics](https://jqwik.net/docs/current/user-guide.html#collecting-and-reporting-statistics)
 on the data it generates, allowing you to inspect what the generated data is 
and whether it's useful or can be tweaked.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-03-16 Thread GitBox


chia7712 commented on a change in pull request #10318:
URL: https://github.com/apache/kafka/pull/10318#discussion_r595414671



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -428,7 +428,7 @@ class IncrementalFetchContext(private val time: Time,
 val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
 if (mustRespond) {
   nextElement = element
-  if (updateFetchContextAndRemoveUnselected) {
+  if (updateFetchContextAndRemoveUnselected && 
FetchResponse.recordsSize(respData) > 0) {

Review comment:
   Thanks for nice explanation!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >