[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2021-04-17 Thread Aviral Srivastava (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324391#comment-17324391
 ] 

Aviral Srivastava commented on KAFKA-12506:
---

Hi [~ableegoldman], upon running the command: `./gradlew streams:test`, I am 
getting a successful build:

```


Deprecated Gradle features were used in this build, making it incompatible with 
Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See 
https://docs.gradle.org/6.8.3/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 13m 31s
46 actionable tasks: 13 executed, 33 up-to-date

```

 

Full log: 
https://docs.google.com/document/d/1NwnEMFd926zbfM56fs54vjcFnqPXW5x_Rr2C8Qg4kvQ/edit?usp=sharing

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Aviral Srivastava
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500
> See the umbrella ticket for the list of improvements we need to make this a 
> more effective integration test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wenbingshen commented on pull request #10553: MINOR: Fix the negative time difference value from Log.scala

2021-04-17 Thread GitBox


wenbingshen commented on pull request #10553:
URL: https://github.com/apache/kafka/pull/10553#issuecomment-821914238


   @dajac Can you take a look at this PR? Thanks. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10553: MINOR: Fix the negative time difference value from Log.scala

2021-04-17 Thread GitBox


wenbingshen commented on pull request #10553:
URL: https://github.com/apache/kafka/pull/10553#issuecomment-821913990


   @lbradstreet  Can you take a look at this PR? I think you made a mistake by 
accident. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-17 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324378#comment-17324378
 ] 

Luke Chen commented on KAFKA-12675:
---

Awesome! I'll also check your code to see how we can improve in KAFKA-12676. 
Thank you.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-17 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r615204809



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataContext.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import java.util.Objects;
+
+/**
+ * The context associated with the record in remote log metadata topic. This 
contains api-key, version and the
+ * payload object.
+ * 
+ * 
+ * For example:
+ * Remote log segment metadata record will have
+ * 
+ * 
+ * api key as: {@link 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord#apiKey()}*
 
+ * version as: 0 (or respective version) , and 
+ * payload as: {@link 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata}
+ * 
+ * 
+ * 
+ *
+ * You can read more details in https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat;>KIP-405
+ */
+public class RemoteLogMetadataContext {

Review comment:
   Sure, I can reuse `ApiMessageAndVersion`. +1 to have these in a common 
module. I prefer having them in a new module called `server-common`(or with 
better name) instead of putting it in clients module. We can slowly move 
server-common related code from clients module. I will raise a separate PR for 
this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9119) KIP-500: Replace ZooKeeper with a Metadata Quorum

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9119.
-
Fix Version/s: 2.8.0
   Resolution: Done

> KIP-500: Replace ZooKeeper with a Metadata Quorum
> -
>
> Key: KAFKA-9119
> URL: https://issues.apache.org/jira/browse/KAFKA-9119
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> Umbrella JIRA for tasks related to KIP-500: Replace ZooKeeper with a Metadata 
> Quorum



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9705) Zookeeper mutation protocols should be redirected to Controller only

2021-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324370#comment-17324370
 ] 

Colin McCabe commented on KAFKA-9705:
-

[~vvcephei]: this isn't a blocker for 2.8, but we will have to do some of these 
things for 3.0.

> Zookeeper mutation protocols should be redirected to Controller only
> 
>
> Key: KAFKA-9705
> URL: https://issues.apache.org/jira/browse/KAFKA-9705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 3.0.0
>
>
> In the bridge release, we need to restrict the direct access of ZK to 
> controller only. This means the existing AlterConfig path should be migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9125) GroupMetadataManager and TransactionStateManager should query the controller instead of zkClient

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9125.
-
Fix Version/s: 2.8.0
 Assignee: Ron Dagostino  (was: Viktor Somogyi-Vass)
   Resolution: Fixed

> GroupMetadataManager and TransactionStateManager should query the controller 
> instead of zkClient
> 
>
> Key: KAFKA-9125
> URL: https://issues.apache.org/jira/browse/KAFKA-9125
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 2.8.0
>
>
> Both classes query their respective topic's partition count via the zkClient. 
> This however could be queried by the broker's local metadata cache.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9125) GroupMetadataManager and TransactionStateManager should query the controller instead of zkClient

2021-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324369#comment-17324369
 ] 

Colin McCabe commented on KAFKA-9125:
-

We do indeed now get GroupMetadataManager and TransactionStateManager metadata 
from the metadata topic (and associated cache) when in KRaft (kip-500) mode.  
Closing as fixed.

> GroupMetadataManager and TransactionStateManager should query the controller 
> instead of zkClient
> 
>
> Key: KAFKA-9125
> URL: https://issues.apache.org/jira/browse/KAFKA-9125
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Both classes query their respective topic's partition count via the zkClient. 
> This however could be queried by the broker's local metadata cache.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10691) AlterIsr Respond with wrong Error Id

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-10691:
-
Labels: kip-500  (was: )

> AlterIsr Respond with wrong Error Id
> 
>
> Key: KAFKA-10691
> URL: https://issues.apache.org/jira/browse/KAFKA-10691
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>  Labels: kip-500
>
> AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, 
> which should be UNKNOWN_MEMBER_ID. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8820) kafka-reassign-partitions.sh should support the KIP-455 API

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-8820:

Labels: kip-500  (was: )

> kafka-reassign-partitions.sh should support the KIP-455 API
> ---
>
> Key: KAFKA-8820
> URL: https://issues.apache.org/jira/browse/KAFKA-8820
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.5.0
>
>
> KIP-455 and KAFKA-8345 add a protocol and AdminAPI that will be used for 
> replica reassignments. We need to update the reassignment tool to use this 
> new API rather than work with ZK directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9119) KIP-500: Replace ZooKeeper with a Metadata Quorum

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9119:

Labels: kip-500  (was: )

> KIP-500: Replace ZooKeeper with a Metadata Quorum
> -
>
> Key: KAFKA-9119
> URL: https://issues.apache.org/jira/browse/KAFKA-9119
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>
> Umbrella JIRA for tasks related to KIP-500: Replace ZooKeeper with a Metadata 
> Quorum



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9167) Implement a broker to controller request channel

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9167:

Labels: kip-500  (was: )

> Implement a broker to controller request channel
> 
>
> Key: KAFKA-9167
> URL: https://issues.apache.org/jira/browse/KAFKA-9167
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>  Labels: kip-500
>
> In some cases, we will need to create a new API to replace an operation that 
> was formerly done via ZooKeeper.  One example of this is that when the leader 
> of a partition wants to modify the in-sync replica set, it currently modifies 
> ZooKeeper directly  In the post-ZK world, the leader will make an RPC to the 
> active controller instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9154) ProducerId generation should be managed by the Controller

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9154:

Labels: kip-500  (was: )

> ProducerId generation should be managed by the Controller
> -
>
> Key: KAFKA-9154
> URL: https://issues.apache.org/jira/browse/KAFKA-9154
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
>  Labels: kip-500
>
> Currently producerIds are maintained in Zookeeper but in the future we'd like 
> them to be managed by the controller quorum in an internal topic.
> The reason for storing this in Zookeeper was that this must be unique across 
> the cluster. In this task it should be refactored such that the 
> TransactionManager turns to the Controller for a ProducerId which connects to 
> Zookeeper to acquire this ID. Since ZK is the single source of truth and the 
> PID won't be cached anywhere it should be safe (just one extra hop added).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9166) Implement MetadataFetch API

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9166:

Labels: kip-500  (was: )

> Implement MetadataFetch API
> ---
>
> Key: KAFKA-9166
> URL: https://issues.apache.org/jira/browse/KAFKA-9166
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: Viktor Somogyi-Vass
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> Brief description of the ask is mentioned in KIP-500's 
> [BrokerMetadataManagement|https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum#KIP-500:ReplaceZooKeeperwithaSelf-ManagedMetadataQuorum-BrokerMetadataManagement]
> Instead of the controller pushing out updates to the other brokers, those 
> brokers will fetch updates from the active controller via the new 
> MetadataFetch API.
> A MetadataFetch is similar to a fetch request. Just like with a fetch 
> request, the broker will track the offset of the last updates it fetched, and 
> only request newer updates from the active controller.
> The broker will persist the metadata it fetched to disk. This will allow the 
> broker to start up very quickly, even if there are hundreds of thousands or 
> even millions of partitions. (Note that since this persistence is an 
> optimization, we can leave it out of the first version, if it makes 
> development easier.)
> Most of the time, the broker should only need to fetch the deltas, not the 
> full state. However, if the broker is too far behind the active controller, 
> or if the broker has no cached metadata at all, the controller will send a 
> full metadata image rather than a series of deltas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9154) ProducerId generation should be managed by the Controller

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9154:

Parent: (was: KAFKA-9119)
Issue Type: New Feature  (was: Sub-task)

> ProducerId generation should be managed by the Controller
> -
>
> Key: KAFKA-9154
> URL: https://issues.apache.org/jira/browse/KAFKA-9154
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
>
> Currently producerIds are maintained in Zookeeper but in the future we'd like 
> them to be managed by the controller quorum in an internal topic.
> The reason for storing this in Zookeeper was that this must be unique across 
> the cluster. In this task it should be refactored such that the 
> TransactionManager turns to the Controller for a ProducerId which connects to 
> Zookeeper to acquire this ID. Since ZK is the single source of truth and the 
> PID won't be cached anywhere it should be safe (just one extra hop added).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9154) ProducerId generation should be managed by the Controller

2021-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324367#comment-17324367
 ] 

Colin McCabe commented on KAFKA-9154:
-

[~satish.duggana]: [~mumrah] posted a KIP about this, KIP-730.  Let me see if I 
can find some other stuff you might be interested in...

> ProducerId generation should be managed by the Controller
> -
>
> Key: KAFKA-9154
> URL: https://issues.apache.org/jira/browse/KAFKA-9154
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
>
> Currently producerIds are maintained in Zookeeper but in the future we'd like 
> them to be managed by the controller quorum in an internal topic.
> The reason for storing this in Zookeeper was that this must be unique across 
> the cluster. In this task it should be refactored such that the 
> TransactionManager turns to the Controller for a ProducerId which connects to 
> Zookeeper to acquire this ID. Since ZK is the single source of truth and the 
> PID won't be cached anywhere it should be safe (just one extra hop added).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9154) ProducerId generation should be managed by the Controller

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-9154:
---

Assignee: David Arthur  (was: Colin McCabe)

> ProducerId generation should be managed by the Controller
> -
>
> Key: KAFKA-9154
> URL: https://issues.apache.org/jira/browse/KAFKA-9154
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
>
> Currently producerIds are maintained in Zookeeper but in the future we'd like 
> them to be managed by the controller quorum in an internal topic.
> The reason for storing this in Zookeeper was that this must be unique across 
> the cluster. In this task it should be refactored such that the 
> TransactionManager turns to the Controller for a ProducerId which connects to 
> Zookeeper to acquire this ID. Since ZK is the single source of truth and the 
> PID won't be cached anywhere it should be safe (just one extra hop added).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9837) New RPC for notifying controller of failed replica

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9837:

Parent: (was: KAFKA-9119)
Issue Type: New Feature  (was: Sub-task)

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
> Fix For: 2.9
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9837) New RPC for notifying controller of failed replica

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9837:

Labels: kip-500  (was: )

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
> Fix For: 2.9
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-17 Thread GitBox


kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615310932



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
 );
 }
 
-private  void addPlugins(Collection> plugins, ClassLoader 
loader) {
+//visible for testing
+ void addPlugins(Collection> plugins, ClassLoader loader) 
{
 for (PluginDesc plugin : plugins) {
 String pluginClassName = plugin.className();
 SortedMap, ClassLoader> inner = 
pluginLoaders.get(pluginClassName);
+boolean pluginConflict = false;
 if (inner == null) {
 inner = new TreeMap<>();
 pluginLoaders.put(pluginClassName, inner);
 // TODO: once versioning is enabled this line should be moved 
outside this if branch
 log.info("Added plugin '{}'", pluginClassName);
+} else {
+pluginConflict = true;
 }
 inner.put(plugin, loader);
+if (pluginConflict) {
+log.error("Detected multiple copies of plugin '{}', one of 
these will be used '{}'", pluginClassName, inner.keySet());
+}

Review comment:
   I had discussed that case with Randall and you are right detecting and 
logging only once would need the change to inspect the map after all plugins 
are loaded but soon we will be working on dynamically loading plugins at 
runtime to improve the worker startup time so it would be a moot point to do it 
that way.  This is why it's ok to see the same statement twice as it would be 
rare for someone to have 3 copies of the same plugin.
   
   sure let me make the change to log which one will be used as of the time 
when statement is logged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9166) Implement MetadataFetch API

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9166.
-
Fix Version/s: 2.8.0
   Resolution: Duplicate

Closing as duplicate of KAFKA-10435

> Implement MetadataFetch API
> ---
>
> Key: KAFKA-9166
> URL: https://issues.apache.org/jira/browse/KAFKA-9166
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: Viktor Somogyi-Vass
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 2.8.0
>
>
> Brief description of the ask is mentioned in KIP-500's 
> [BrokerMetadataManagement|https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum#KIP-500:ReplaceZooKeeperwithaSelf-ManagedMetadataQuorum-BrokerMetadataManagement]
> Instead of the controller pushing out updates to the other brokers, those 
> brokers will fetch updates from the active controller via the new 
> MetadataFetch API.
> A MetadataFetch is similar to a fetch request. Just like with a fetch 
> request, the broker will track the offset of the last updates it fetched, and 
> only request newer updates from the active controller.
> The broker will persist the metadata it fetched to disk. This will allow the 
> broker to start up very quickly, even if there are hundreds of thousands or 
> even millions of partitions. (Note that since this persistence is an 
> optimization, we can leave it out of the first version, if it makes 
> development easier.)
> Most of the time, the broker should only need to fetch the deltas, not the 
> full state. However, if the broker is too far behind the active controller, 
> or if the broker has no cached metadata at all, the controller will send a 
> full metadata image rather than a series of deltas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9118) LogDirFailureHandler shouldn't use Zookeeper

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9118:

Parent: (was: KAFKA-9119)
Issue Type: Improvement  (was: Sub-task)

> LogDirFailureHandler shouldn't use Zookeeper
> 
>
> Key: KAFKA-9118
> URL: https://issues.apache.org/jira/browse/KAFKA-9118
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> As described in 
> [KIP-112|https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD#KIP-112:HandlediskfailureforJBOD-Zookeeper]:
> {noformat}
> 2. A log directory stops working on a broker during runtime
> - The controller watches the path /log_dir_event_notification for new znode.
> - The broker detects offline log directories during runtime.
> - The broker takes actions as if it has received StopReplicaRequest for this 
> replica. More specifically, the replica is no longer considered leader and is 
> removed from any replica fetcher thread. (The clients will receive a 
> UnknownTopicOrPartitionException at this point)
> - The broker notifies the controller by creating a sequential znode under 
> path /log_dir_event_notification with data of the format {"version" : 1, 
> "broker" : brokerId, "event" : LogDirFailure}.
> - The controller reads the znode to get the brokerId and finds that the event 
> type is LogDirFailure.
> - The controller deletes the notification znode
> - The controller sends LeaderAndIsrRequest to that broker to query the state 
> of all topic partitions on the broker. The LeaderAndIsrResponse from this 
> broker will specify KafkaStorageException for those partitions that are on 
> the bad log directories.
> - The controller updates the information of offline replicas in memory and 
> trigger leader election as appropriate.
> - The controller removes offline replicas from ISR in the ZK and sends 
> LeaderAndIsrRequest with updated ISR to be used by partition leaders.
> - The controller propagates the information of offline replicas to brokers by 
> sending UpdateMetadataRequest.
> {noformat}
> Instead of the notification ZNode we should use a Kafka protocol that sends a 
> notification message to the controller with the offline partitions. The 
> controller then updates the information of offline replicas in memory and 
> trigger leader election, then removes the replicas from ISR in ZK and sends a 
> LAIR and an UpdateMetadataRequest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9167) Implement a broker to controller request channel

2021-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324359#comment-17324359
 ] 

Colin McCabe commented on KAFKA-9167:
-

Closing as duplicate of KAFKA-10270

> Implement a broker to controller request channel
> 
>
> Key: KAFKA-9167
> URL: https://issues.apache.org/jira/browse/KAFKA-9167
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> In some cases, we will need to create a new API to replace an operation that 
> was formerly done via ZooKeeper.  One example of this is that when the leader 
> of a partition wants to modify the in-sync replica set, it currently modifies 
> ZooKeeper directly  In the post-ZK world, the leader will make an RPC to the 
> active controller instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9124) KIP-497: ISR changes should be propagated via Kafka protocol

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9124.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

> KIP-497: ISR changes should be propagated via Kafka protocol
> 
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
> Fix For: 2.7.0
>
>
> Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates 
> Zookeeper which is listened by the controller and that's how it notices the 
> ISR changes and sends out metadata requests.
> Instead of this the brokers should use Kafka protocol messages to send out 
> ISR change notifications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-17 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324358#comment-17324358
 ] 

Travis Bischel commented on KAFKA-12675:


Great insight on getting rid of partition2AllPotentialConsumers, as well as 
keeping some more things sorted! I was able to translate that into my own code 
and dropped the large imbalance from 9.5s to 0.5s, as well as from 8.5G memory 
util to 0.5G :)

I'll take a look at the code more in depth soon.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9124) KIP-497: ISR changes should be propagated via Kafka protocol

2021-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324357#comment-17324357
 ] 

Colin McCabe commented on KAFKA-9124:
-

[~mumrah] implemented this here: https://github.com/apache/kafka/pull/9100

> KIP-497: ISR changes should be propagated via Kafka protocol
> 
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
>
> Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates 
> Zookeeper which is listened by the controller and that's how it notices the 
> ISR changes and sends out metadata requests.
> Instead of this the brokers should use Kafka protocol messages to send out 
> ISR change notifications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9124) KIP-497: ISR changes should be propagated via Kafka protocol

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-9124:
---

Assignee: David Arthur  (was: Colin McCabe)

> KIP-497: ISR changes should be propagated via Kafka protocol
> 
>
> Key: KAFKA-9124
> URL: https://issues.apache.org/jira/browse/KAFKA-9124
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Viktor Somogyi-Vass
>Assignee: David Arthur
>Priority: Major
>
> Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates 
> Zookeeper which is listened by the controller and that's how it notices the 
> ISR changes and sends out metadata requests.
> Instead of this the brokers should use Kafka protocol messages to send out 
> ISR change notifications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9119) KIP-500: Replace ZooKeeper with a Metadata Quorum

2021-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324356#comment-17324356
 ] 

Colin McCabe commented on KAFKA-9119:
-

The upcoming Kafka 2.8 release now supports running in KRaft (kip-500) mode as 
an early access feature.  Thanks to everyone who helped push this over the 
finish line!

Note: another way to find KRaft / kip-500 changes, besides through this 
umbrella JIRA, is to look for the "kip-500" JIRA tag and the "kip-500" github 
PR tag.

> KIP-500: Replace ZooKeeper with a Metadata Quorum
> -
>
> Key: KAFKA-9119
> URL: https://issues.apache.org/jira/browse/KAFKA-9119
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> Umbrella JIRA for tasks related to KIP-500: Replace ZooKeeper with a Metadata 
> Quorum



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12345.
--
Fix Version/s: 2.8.0
 Assignee: Boyang Chen
   Resolution: Fixed

> KIP-500: AlterIsrManager crashes on broker idle-state
> -
>
> Key: KAFKA-12345
> URL: https://issues.apache.org/jira/browse/KAFKA-12345
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Alok Nikhil
>Assignee: Boyang Chen
>Priority: Minor
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> Occasionally, a scheduler thread on a broker crashes with this stack
>  
> {code:java}
> [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 
> 'send-alter-isr' (kafka.utils.KafkaScheduler)
>  java.lang.NullPointerException
>  at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117)
>  at 
> kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85)
>  at 
> kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
> After that the broker is unable to fetch any records from any other broker 
> (and vice versa)
> {code:java}
> [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
> fetcherId=0] Error sending fetch request (sessionId=164432409
>  2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler)
>  java.io.IOException: Connection to 4 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12349) Follow up on PartitionEpoch in KIP-500

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-12349:


Assignee: Jason Gustafson  (was: Colin McCabe)

> Follow up on PartitionEpoch in KIP-500
> --
>
> Key: KAFKA-12349
> URL: https://issues.apache.org/jira/browse/KAFKA-12349
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Major
>
> * Remove the compatibility shim between raft and the kip-500 controller
> * standardize on the epoch data type (probably int)
> * review partition epoch, leader epoch



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12349) Follow up on PartitionEpoch in KIP-500

2021-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324355#comment-17324355
 ] 

Colin McCabe commented on KAFKA-12349:
--

[~hachikuji] has a PR for removing the compatibility shim here

> Follow up on PartitionEpoch in KIP-500
> --
>
> Key: KAFKA-12349
> URL: https://issues.apache.org/jira/browse/KAFKA-12349
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Major
>
> * Remove the compatibility shim between raft and the kip-500 controller
> * standardize on the epoch data type (probably int)
> * review partition epoch, leader epoch



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12276) Add KIP-500 controller code

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12276.
--
Fix Version/s: 2.8.0
   Resolution: Fixed

> Add KIP-500 controller code
> ---
>
> Key: KAFKA-12276
> URL: https://issues.apache.org/jira/browse/KAFKA-12276
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> Add the KIP-500 controller code



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12282) KIP-500: Reconcile configuration variables between trunk and the KIP-500 branch

2021-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12282.
--
Fix Version/s: 2.8.0
   Resolution: Fixed

> KIP-500: Reconcile configuration variables between trunk and the KIP-500 
> branch
> ---
>
> Key: KAFKA-12282
> URL: https://issues.apache.org/jira/browse/KAFKA-12282
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Alok Nikhil
>Assignee: Alok Nikhil
>Priority: Minor
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> Some config changes/additions were made on the KIP-500 branch that need to be 
> merged back in to trunk



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kebab-mai-haddi opened a new pull request #10554: Expand AdjustStreamThreadCountTest by writing some data to Kafka topics

2021-04-17 Thread GitBox


kebab-mai-haddi opened a new pull request #10554:
URL: https://github.com/apache/kafka/pull/10554


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kebab-mai-haddi closed pull request #10432: KAFKA-12506

2021-04-17 Thread GitBox


kebab-mai-haddi closed pull request #10432:
URL: https://github.com/apache/kafka/pull/10432


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12680) Failed to restart the broker in kraft mode

2021-04-17 Thread Wenbing Shen (Jira)
Wenbing Shen created KAFKA-12680:


 Summary: Failed to restart the broker in kraft mode
 Key: KAFKA-12680
 URL: https://issues.apache.org/jira/browse/KAFKA-12680
 Project: Kafka
  Issue Type: Bug
Reporter: Wenbing Shen


I tested kraft mode for the first time today, I deployed a single node kraft 
mode broker according to the documentation:

[https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md]

 

first step: ./bin/kafka-storage.sh random-uuid


Second step: Use the uuid generated above to execute the following commands:

./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties

 

third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties

 

Then I created two topics with two partitions and a single replica.

./bin/kafka-topics.sh --create --topic test-01 --partitions 2 
--replication-factor 1 --bootstrap-server localhost:9092

Verify that there is no problem with production and consumption, but when I 
call kafka-server-stop.sh, when I call the start command again, the broker 
starts to report an error.

I am not sure if it is a known bug or a problem with my usage

 

[2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception 
(kafka.Kafka$)
java.io.IOException: Invalid argument
 at java.io.RandomAccessFile.setLength(Native Method)
 at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189)
 at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175)
 at kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241)
 at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
 at kafka.log.LogSegment.recover(LogSegment.scala:385)
 at kafka.log.Log.recoverSegment(Log.scala:741)
 at kafka.log.Log.recoverLog(Log.scala:894)
 at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816)
 at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source)
 at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
 at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456)
 at kafka.log.Log.loadSegments(Log.scala:816)
 at kafka.log.Log.(Log.scala:326)
 at kafka.log.Log$.apply(Log.scala:2593)
 at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358)
 at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253)
 at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
 at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74)
 at kafka.Kafka$.buildServer(Kafka.scala:79)
 at kafka.Kafka$.main(Kafka.scala:87)
 at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wenbingshen opened a new pull request #10553: MINOR: Fix the negative time difference value from Log.scala

2021-04-17 Thread GitBox


wenbingshen opened a new pull request #10553:
URL: https://github.com/apache/kafka/pull/10553


   When I tested the kraft mode, I found that the time taken to load the 
snapshot in the startup log was negative.
   
   [2021-04-18 00:19:37,377] INFO [Log partition=@metadata-0, 
dir=/tmp/kraft-combined-logs] Recovering unflushed segment 0 (kafka.log.Log)
   [2021-04-18 00:19:37,382] INFO [Log partition=@metadata-0, 
dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message 
format version 2 (kafka.log.Log)
   [2021-04-18 00:19:37,389] INFO [Log partition=@metadata-0, 
dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding 
producer state from offset 0 (kafka.log.Log)
   [2021-04-18 00:19:37,393] INFO [Log partition=@metadata-0, 
dir=/tmp/kraft-combined-logs] Producer state recovery took **-2ms** for 
snapshot load and 1ms for segment recovery from offset 0 (kafka.log.Log)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821841765


   Ok, I finished my writings above (I just had to be as detailed as possible, 
these comments serve me a lot).
   
   > Could we reintroduce a configuration that behaves the same as 
`testRuntime` did before it was removed?
   
   Well, I can try to revert to a previous solution, see how that goes and 
build it from there.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821841078


   So, I started to abandon quick and dirty solution and to lean towards 
something heavier (i.e. to a some form of refactoring):
   
- 
https://discuss.gradle.org/t/gradle-support-for-cyclic-dependencies/14355/2
- 
https://stackoverflow.com/questions/38062841/how-to-resolve-circular-dependency-in-gradle/38063421#38063421
- 
https://stackoverflow.com/questions/58178381/resolve-circular-dependency-in-gradle/58186686#58186686
   
Gradle introduced new thing (among many others): `java-test-fixtures` 
plugin  (that goes hand in hand with `java-library') that is able to fine-grain 
test related dependency configurations:
   - 
https://docs.gradle.org/6.8.3/userguide/java_testing.html#declaring_dependencies_of_test_fixtures
 (some browsers show page bottom... in that case scroll up to **_"Declaring 
dependencies of test fixtures"_*)
   - 
https://stackoverflow.com/questions/5644011/multi-project-test-dependencies-with-gradle/60138176#60138176
 
   - 
https://stackoverflow.com/questions/64133013/gradle-test-fixtures-plugin-and-core-module-dependencies
 
   - 
https://stackoverflow.com/questions/5144325/gradle-test-dependency/66138658#66138658


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


ijuma edited a comment on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821839178


   Could we reintroduce a configuration that behaves the same as `testRuntime` 
did before it was removed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


ijuma commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821839178


   Could we reintroduce a configuration that behaved the same as `testRuntime`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


dejan2609 edited a comment on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821837798


   Digging through project history I managed to find some commits that are 
seems to be related (see the comment):
   
   ```
   project(':streams') {
 dependencies {
   api project(':clients')
   
   // testCompileOnly prevents streams from exporting a dependency on 
test-utils, which would cause a dependency cycle
   testCompileOnly project(':streams:test-utils')
   
   
   testRuntimeOnly project(':streams:test-utils')
 }
   
   }
   ```
   @vvcephei contributed two PRs (#4821 and #4812) 3 years ago and these are 
related commits:
   - **05. Apr 2018**. 
https://github.com/apache/kafka/commit/d5db4e9b8055536bdbb3b2d2308e36efa4abafcc
   - **23. Apr 2018.** 
https://github.com/apache/kafka/commit/ed51b2cdf5bdac210a6904bead1a2ca6e8411406
   
   Out of curiosity I tried to consolidate these dependency into one 
`testImplementation project(':streams:test-utils')` and here are the results:
- on top of trunk: build works fine ✔️ 
- on top of this PR: build fails ❌ (with identical `Circular dependency 
between the following tasks` error as above)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


dejan2609 edited a comment on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821837798


   Digging through project history I managed to find some commits that are 
seems to be related (see the comment):
   
   ```
   project(':streams') {
 dependencies {
   api project(':clients')
   
   // testCompileOnly prevents streams from exporting a dependency on 
test-utils, which would cause a dependency cycle
   testCompileOnly project(':streams:test-utils')
   
   
   testRuntimeOnly project(':streams:test-utils')
 }
   
   }
   ```
   @vvcephei contributed two PRs (#4821 and #4812) 3 years ago and these are 
related commits:
   - **05. Apr 2018**. 
https://github.com/apache/kafka/commit/d5db4e9b8055536bdbb3b2d2308e36efa4abafcc
   - **23. Apr 2018.** 
https://github.com/apache/kafka/commit/ed51b2cdf5bdac210a6904bead1a2ca6e8411406
   
   Out of curiosity I tried to consolidate these dependency into one 
`testImplementation project(':streams:test-utils')` and here are the results:
- on top of trunk: build works fine ✔️ 
- on top of this PR: build fails ❌ (with identical `Circular dependency 
between the following tasks` error as above)
   
   (...moving to phase 3...)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821837798


   Digging through project history I managed to find some commits that are 
seems to be related (see the comment):
   
   ```
   project(':streams') {
 dependencies {
   api project(':clients')
   
   // testCompileOnly prevents streams from exporting a dependency on 
test-utils, which would cause a dependency cycle
   testCompileOnly project(':streams:test-utils')
   
   
   testRuntimeOnly project(':streams:test-utils')
 }
   
   }
   ```
   @vvcephei contributed two PRs (#4821 and #4812) 3 years ago and these are 
related commits:
   - **05. Apr 2018**. 
https://github.com/apache/kafka/commit/d5db4e9b8055536bdbb3b2d2308e36efa4abafcc
   - **23. Apr 2018.** 
https://github.com/apache/kafka/commit/ed51b2cdf5bdac210a6904bead1a2ca6e8411406
   
   Out of curiosity I tried to consolidate these dependency into one 
`testImplementation project(':streams:test-utils')` and here are the results:
- on top of trunk: build works fine ✔️ 
- on top of this PR: build fails ❌ (with identical `Circular dependency 
between the following tasks` error as above)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12660) Do not update offset commit sensor after append failure

2021-04-17 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-12660:
--

Assignee: dengziming

> Do not update offset commit sensor after append failure
> ---
>
> Key: KAFKA-12660
> URL: https://issues.apache.org/jira/browse/KAFKA-12660
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>
> In the append callback after writing an offset to the log in 
> `GroupMetadataManager`, It seems wrong to update the offset commit sensor 
> prior to checking for errors: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L394.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10783) Rewrite TopicPartitionStateZNode struct with auto-generated protocol

2021-04-17 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-10783.

Resolution: Won't Do

> Rewrite TopicPartitionStateZNode struct with auto-generated protocol
> 
>
> Key: KAFKA-10783
> URL: https://issues.apache.org/jira/browse/KAFKA-10783
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)

2021-04-17 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821831500


   Also, after some digging I stumbled upon these GitHub resources (I will 
intentionally leave them formatted as code):
   - `https://github.com/gradle/gradle/issues/6353`
   - `https://github.com/gradle/gradle/issues/847` (especially comment: 
`https://github.com/gradle/gradle/issues/847#issuecomment-287515195`):
   > Came across this and was looking for a workaround...so in case anyone else 
is trying to find a way out: it seems sufficient to change `group` for the 
subprojects such that they end up with different module coordinates
   
   I tested this quick and dirty solution (experimented with changing `group` 
for subprojects) but that action yield no results...  
   
   Also, I played around with dependencies and tasks:
- ./gradlew streams:dependencies > DEPS.txt
- ./gradlew tiTree  :streams:copyDependantLibs   > TasksGraph.txt
- ./gradlew tiOrder :streams:copyDependantLibs > TasksOrder.txt 
(appropriate plugin is added in order to create Gradle taks graph and list).
   
   So, after some time, I moved to a phase two.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10466: MINOR: Gradle upgrade: 6.8.3 -->> 7.0-rc-2 [work in progress]

2021-04-17 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-821826801


   I am going to split my findings to a separate comments... here it goes.
   
   I decided to work with Gradle 6.8.3 for the moment (that is, until circular 
dependency issue is resolved). After that we can try with Gradle 7.0.
   
   So, I am going to change title and align it with a corresponding JIRA ticket 
that was mentioned above: https://issues.apache.org/jira/browse/KAFKA-12417
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ketulgupta1995 commented on pull request #10519: KAFKA-12344 Support SlidingWindows in the Scala API

2021-04-17 Thread GitBox


ketulgupta1995 commented on pull request #10519:
URL: https://github.com/apache/kafka/pull/10519#issuecomment-821793666


   Hi @lct45 removed the nits and checkstyle issues.
   Please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-17 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324204#comment-17324204
 ] 

Luke Chen commented on KAFKA-12675:
---

PR: [https://github.com/apache/kafka/pull/10552]

[~twmb], I didn't find you in kafka github, welcome to review. Thank you.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-821792471


   @ableegoldman , please help review this PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on a change in pull request #10551: MINOR:Remove nonsense test line from TopicCommandTest

2021-04-17 Thread GitBox


wenbingshen commented on a change in pull request #10551:
URL: https://github.com/apache/kafka/pull/10551#discussion_r615229878



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -736,7 +735,6 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
   topicService.describeTopic(new TopicCommandOptions(Array("--topic", 
testTopicName
 val rows = output.split("\n")
 assertEquals(2, rows.size)
-rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")

Review comment:
   Thanks for your commont. Due to the introduction of topicId, this will 
cause assert to fail. We can’t get the topicId in advance, so we can use 
`assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229673



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##
@@ -453,6 +453,43 @@ public void 
testLargeAssignmentAndGroupWithUniformSubscription() {
 assignor.assign(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(120)
+@Test
+public void testLargeAssignmentAndGroupWithNonEqualSubscription() {
+// 1 million partitions!
+int topicCount = 500;
+int partitionCount = 2_000;
+int consumerCount = 2_000;
+
+List topics = new ArrayList<>();
+Map partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionCount);
+}
+for (int i = 0; i < consumerCount; i++) {
+if (i == consumerCount - 1) {
+subscriptions.put(getConsumerName(i, consumerCount), new 
Subscription(topics.subList(0, 1)));

Review comment:
   subscribe to only 1 topic for the last consumer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229332



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -527,12 +615,23 @@ private int getBalanceScore(Map> assignment) {
  * Sort valid partitions so they are processed in the potential 
reassignment phase in the proper order
  * that causes minimal partition movement among consumers (hence honoring 
maximal stickiness)
  *
- * @param partition2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param topic2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param partitionsPerTopic The number of partitions for each subscribed 
topic
  * @return  an ascending sorted list of topic partitions based on how many 
consumers can potentially use them
  */
-private List sortPartitions(Map> partition2AllPotentialConsumers) {
-List sortedPartitions = new 
ArrayList<>(partition2AllPotentialConsumers.keySet());
-Collections.sort(sortedPartitions, new 
PartitionComparator(partition2AllPotentialConsumers));
+private List sortPartitions(Map> 
topic2AllPotentialConsumers,
+Map 
partitionsPerTopic) {
+List sortedPartitions = new ArrayList<>();
+List allTopics = new 
ArrayList<>(topic2AllPotentialConsumers.keySet());
+Collections.sort(allTopics, new 
TopicComparator(topic2AllPotentialConsumers));
+
+// since allTopics are sorted, we can loop through allTopics to create 
the sortedPartitions

Review comment:
   refactor 4: To have `sortPartitions` list, we used to sort all of the 
partitions. To improve it, I sort all topics first(only 500 topics to sort, 
compared to the original 1 million partitions to sort), and then add the 
partitions by looping the all sorted topics. (small improvement)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229332



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -527,12 +615,23 @@ private int getBalanceScore(Map> assignment) {
  * Sort valid partitions so they are processed in the potential 
reassignment phase in the proper order
  * that causes minimal partition movement among consumers (hence honoring 
maximal stickiness)
  *
- * @param partition2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param topic2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param partitionsPerTopic The number of partitions for each subscribed 
topic
  * @return  an ascending sorted list of topic partitions based on how many 
consumers can potentially use them
  */
-private List sortPartitions(Map> partition2AllPotentialConsumers) {
-List sortedPartitions = new 
ArrayList<>(partition2AllPotentialConsumers.keySet());
-Collections.sort(sortedPartitions, new 
PartitionComparator(partition2AllPotentialConsumers));
+private List sortPartitions(Map> 
topic2AllPotentialConsumers,
+Map 
partitionsPerTopic) {
+List sortedPartitions = new ArrayList<>();
+List allTopics = new 
ArrayList<>(topic2AllPotentialConsumers.keySet());
+Collections.sort(allTopics, new 
TopicComparator(topic2AllPotentialConsumers));
+
+// since allTopics are sorted, we can loop through allTopics to create 
the sortedPartitions

Review comment:
   refactor 4: To have `sortPartitions` list, we used to sort all of the 
partitions. To improve it, I sort all topics first, and then add the partitions 
by looping the all sorted topics. (small improvement)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229166



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
 balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionCount);
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and toBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,
+ List 
toBeRemovedPartitions) {
+List unassignedPartitions = new ArrayList<>();
+
+int index = 0;
+boolean shouldAddDirectly = false;
+int sizeToBeRemovedPartitions = toBeRemovedPartitions.size();
+TopicPartition nextPartition = toBeRemovedPartitions.get(index);
+for (TopicPartition topicPartition : sortedPartitions) {
+if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+unassignedPartitions.add(topicPartition);
+} else {
+// equal case, don't add to unassignedPartitions, just get 
next partition
+if (index < sizeToBeRemovedPartitions - 1) {
+nextPartition = toBeRemovedPartitions.get(++index);
+} else {
+// add the remaining directly since there is no more 
toBeRemovedPartitions
+shouldAddDirectly = true;
+}
+}
+}
+return unassignedPartitions;
+}
+
+/**
+ * update the prevAssignment with the partitions, consumer and generation 
in parameters
+ *
+ * @param partitions: The partitions to be updated the prevAssignement
+ * @param consumer: The consumer Id
+ * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+ * @param generation: The generation of this assignment (partitions)
+ */
+private void updatePrevAssignment(Map prevAssignment,
+  List partitions,
+  String consumer,
+  int generation) {
+for (TopicPartition partition: partitions) {
+ConsumerGenerationPair consumerGeneration = 
prevAssignment.get(partition);
+if (consumerGeneration != null) {
+// only keep the latest previous assignment
+if (generation > consumerGeneration.generation)
+prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+} else {
+prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+}
+}
+}
+
+/**
+ * filling in the currentAssignment and prevAssignment from the 
subscriptions.
+ *
+ * @param subscriptions: Map from the member id to their respective topic 
subscription
+ * @param currentAssignment: The assignment contains the assignments with 
the largest generation
+ * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+ */
 private void prepopulateCurrentAssignments(Map 
subscriptions,
Map> currentAssignment,
Map prevAssignment) {
 // we need to process subscriptions' user data with each consumer's 
reported generation in mind
 // higher generations overwrite lower generations in case of a conflict
 // note 

[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228815



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -362,23 +360,36 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 // otherwise (the consumer still exists)
 for (Iterator partitionIter = 
entry.getValue().iterator(); partitionIter.hasNext();) {
 TopicPartition partition = partitionIter.next();
-if 
(!partition2AllPotentialConsumers.containsKey(partition)) {
+if 
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {
 // if this topic partition of this consumer no longer 
exists remove it from currentAssignment of the consumer
 partitionIter.remove();
 currentPartitionConsumer.remove(partition);
-} else if 
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
+} else if 
(!consumerSubscription.topics().contains(partition.topic())) {
 // if this partition cannot remain assigned to its 
current consumer because the consumer
 // is no longer subscribed to its topic remove it from 
currentAssignment of the consumer
 partitionIter.remove();
 revocationRequired = true;
-} else
+} else {
 // otherwise, remove the topic partition from those 
that need to be assigned only if
 // its current consumer is still subscribed to its 
topic (because it is already assigned
 // and we would want to preserve that assignment as 
much as possible)
-unassignedPartitions.remove(partition);
+toBeRemovedPartitions.add(partition);
+}
 }
 }
 }
+
+// all partitions that need to be assigned
+List unassignedPartitions;
+
+if (!toBeRemovedPartitions.isEmpty()) {
+Collections.sort(toBeRemovedPartitions, new 
PartitionComparator(topic2AllPotentialConsumers));
+unassignedPartitions = getUnassignedPartitions(sortedPartitions, 
toBeRemovedPartitions);
+} else {
+unassignedPartitions = sortedPartitions;

Review comment:
   We use `unassignedPartitions` and `sortedPartitions` as the base list, 
so make them refer to the same list to save memory when brand-new assignment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228600



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
 balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionCount);
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and toBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   refactor 2:
   We used to have an ArrayList of `unassignedPartitions`, with all sorted 
partitions (ex: 1 million partitions), and loop through current assignment, to 
remove already assigned partitions, ex: 999,000 of them, so we'll only have 
1000 partitions left. However, the ArrayList element remove is pretty slow for 
huge size because it needs to find element first, and then, do arrayCopy for 
the removed array with size of (originalSize -1). This case should happen a lot 
since each rebalance, we should only have small set of changes (ex: 1 consumer 
dropped), so this is an important improvement.
   
   To refactor it, I used two pointer technique to loop through 2 sorted list: 
`sortedPartitions` and `toBeRemovedPartitions`. And only add the difference set 
of the 2 lists. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615227866



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -313,26 +312,24 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 
 prepopulateCurrentAssignments(subscriptions, currentAssignment, 
prevAssignment);
 
-// a mapping of all topic partitions to all consumers that can be 
assigned to them
-final Map> 
partition2AllPotentialConsumers = new HashMap<>();
-// a mapping of all consumers to all potential topic partitions that 
can be assigned to them
-final Map> 
consumer2AllPotentialPartitions = new HashMap<>();
+// a mapping of all topics to all consumers that can be assigned to 
them
+final Map> topic2AllPotentialConsumers = new 
HashMap<>(partitionsPerTopic.keySet().size());
+// a mapping of all consumers to all potential topics that can be 
assigned to them
+final Map> consumer2AllPotentialTopics = new 
HashMap<>(subscriptions.keySet().size());
 
-// initialize partition2AllPotentialConsumers and 
consumer2AllPotentialPartitions in the following two for loops
+// initialize topic2AllPotentialConsumers and 
consumer2AllPotentialTopics in the following two for loops
 for (Entry entry: partitionsPerTopic.entrySet()) {
 for (int i = 0; i < entry.getValue(); ++i)
-partition2AllPotentialConsumers.put(new 
TopicPartition(entry.getKey(), i), new ArrayList<>());
+topic2AllPotentialConsumers.put(entry.getKey(), new 
ArrayList<>());
 }
 
 for (Entry entry: subscriptions.entrySet()) {
 String consumerId = entry.getKey();
-consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
+List subscribedTopics = new 
ArrayList<>(entry.getValue().topics().size());
+consumer2AllPotentialTopics.put(consumerId, subscribedTopics);
 entry.getValue().topics().stream().filter(topic -> 
partitionsPerTopic.get(topic) != null).forEach(topic -> {
-for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
-TopicPartition topicPartition = new TopicPartition(topic, 
i);
-
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
-
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);

Review comment:
   refactor 1:
   We used to have 2 map `consumer2AllPotentialPartitions` and 
`partition2AllPotentialConsumers`. But that would need a lot of memory here, 
ex: `partition2AllPotentialConsumers` will need 1 million map, and each map 
contains 2000 consumers (suppose 1 million partition and 2000 consumers). But 
actually, we only need to store the topics of each potential 
partitions/consumers, so I changed to `topic2AllPotentialConsumers` and 
`consumer2AllPotentialTopics`. Save memory and save time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-17 Thread GitBox


showuon opened a new pull request #10552:
URL: https://github.com/apache/kafka/pull/10552


   I did code refactor, keep the same algorithm in this PR.
   I've achieved:
   1. Originally, With this setting:
   ```
   topicCount = 50;
   partitionCount = 800;
   consumerCount = 800;
   ```
   We complete in 10 seconds, after my code refactor, the time down to 200 ms
   
   2. With the 1 million partitions setting:
   ```
   topicCount = 500;
   partitionCount = 2000;
   consumerCount = 2000;
   ```
   No OutOfMemory will be thrown anymore. The time will take 5 seconds.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10551: MINOR:Remove nonsense test line from TopicCommandTest

2021-04-17 Thread GitBox


chia7712 commented on a change in pull request #10551:
URL: https://github.com/apache/kafka/pull/10551#discussion_r615220275



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -736,7 +735,6 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
   topicService.describeTopic(new TopicCommandOptions(Array("--topic", 
testTopicName
 val rows = output.split("\n")
 assertEquals(2, rows.size)
-rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")

Review comment:
   Maybe 
`assertTrue(rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1"))`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #10377: KAFKA-12515 ApiVersionManager should create response based on request version

2021-04-17 Thread GitBox


feyman2016 commented on pull request #10377:
URL: https://github.com/apache/kafka/pull/10377#issuecomment-821774667


   Hi, I checked locally, all the failed tests should be unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
 | 18 sec | 1
   -- | -- | --
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryOnlyActivePartitionStoresByDefault
 | 10 sec | 1
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
 | 2 min 13 sec | 1
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
 | 1 min 21 sec | 1
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
 | 1 min 20 sec | 1
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
 | 1 min 19 sec | 1
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
 | 1 min 25 sec | 1
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org