[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2022-02-20 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r810857967



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
##
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClientTagAwareStandbyTaskAssignorTest {
+private static final String ZONE_TAG = "zone";
+private static final String CLUSTER_TAG = "cluster";
+
+private static final String ZONE_1 = "zone1";
+private static final String ZONE_2 = "zone2";
+private static final String ZONE_3 = "zone3";
+
+private static final String CLUSTER_1 = "cluster1";
+private static final String CLUSTER_2 = "cluster2";
+private static final String CLUSTER_3 = "cluster3";
+
+private static final UUID UUID_1 = uuidForInt(1);
+private static final UUID UUID_2 = uuidForInt(2);
+private static final UUID UUID_3 = uuidForInt(3);
+private static final UUID UUID_4 = uuidForInt(4);
+private static final UUID UUID_5 = uuidForInt(5);
+private static final UUID UUID_6 = uuidForInt(6);
+private static final UUID UUID_7 = uuidForInt(7);
+private static final UUID UUID_8 = uuidForInt(8);
+private static final UUID UUID_9 = uuidForInt(9);
+
+@Test
+public void shouldPermitTaskMovementWhenClientTagsMatch() {
+final ClientTagAwareStandbyTaskAssignor standbyTaskAssignor = new 
ClientTagAwareStandbyTaskAssignor();
+final ClientState source = createClientStateWithCapacity(1, 
mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)));
+final ClientState destination = createClientStateWithCapacity(2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)));
+
+assertTrue(standbyTaskAssignor.isAllowedTaskMovement(source, 
destination));
+}
+
+@Test
+public void shouldDeclineTaskMovementWhenClientTagsDoNotMatch() {
+final ClientTagAwareStandbyTaskAssignor standbyTaskAssignor = new 
ClientTagAwareStandbyTaskAssignor();
+final ClientState source = createClientStateWithCapacity(1, 
mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)));
+final ClientState destination = createClientStateWithCapacity(1, 
mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)));
+
+assertFalse(standbyTaskAssignor.isAllowedTaskMovement(source, 
destination));
+}
+
+@Test
+public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(2, 
mkMap(mkEntr

[jira] [Updated] (KAFKA-13517) Update Admin::describeConfigs to allow fetching specific configurations

2022-02-20 Thread Vikas Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikas Singh updated KAFKA-13517:

Description: 
A list of {{ConfigResource}} class is passed as argument to 
{{AdminClient::describeConfigs}} api to indicate configuration of the entities 
to fetch. The {{ConfigResource}} class is made up of two fields, name and type 
of entity. Kafka returns *all* configurations for the entities provided to the 
admin client api.

This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the 
configuration for the entities in question. In addition to name and type of 
entity whose configuration to get, Kafka {{DescribeConfigsResource}} structure 
also lets users provide {{ConfigurationKeys}} list, which allows users to fetch 
only the configurations that are needed.

However, this field isn't exposed in the {{ConfigResource}} class that is used 
by AdminClient, so users of AdminClient have no way to ask for specific 
configuration. The API always returns *all* configurations. Then the user of 
the {{AdminClient::describeConfigs}} go over the returned list and filter out 
the config keys that they are interested in.

This results in boilerplate code for all users of 
{{AdminClient::describeConfigs}} api, in addition to  being wasteful use of 
resource. It becomes painful in large cluster case where to fetch one 
configuration of all topics, we need to fetch all configuration of all topics, 
which can be huge in size. 

Creating this Jira to track changes proposed in KIP-823: 
https://confluentinc.atlassian.net/wiki/spaces/~953215099/pages/2707357918/KIP+Admin+describeConfigs+should+allow+fetching+specific+configurations

  was:
A list of {{ConfigResource}} class is passed as argument to 
{{AdminClient::describeConfigs}} api to indicate configuration of the entities 
to fetch. The {{ConfigResource}} class is made up of two fields, name and type 
of entity. Kafka returns *all* configurations for the entities provided to the 
admin client api.

This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the 
configuration for the entities in question. In addition to name and type of 
entity whose configuration to get, Kafka {{DescribeConfigsResource}} structure 
also lets users provide {{ConfigurationKeys}} list, which allows users to fetch 
only the configurations that are needed.

However, this field isn't exposed in the {{ConfigResource}} class that is used 
by AdminClient, so users of AdminClient have no way to ask for specific 
configuration. The API always returns *all* configurations. Then the user of 
the {{AdminClient::describeConfigs}} go over the returned list and filter out 
the config keys that they are interested in.

This results in boilerplate code for all users of 
{{AdminClient::describeConfigs}} api, in addition to  being wasteful use of 
resource. It becomes painful in large cluster case where to fetch one 
configuration of all topics, we need to fetch all configuration of all topics, 
which can be huge in size. 

Creating this Jira to add same field (i.e. {{{}ConfigurationKeys{}}}) to the 
{{ConfigResource}} structure to bring it to parity to 
{{DescribeConfigsResource}} Kafka API structure. There should be no backward 
compatibility issue as the field will be optional and will behave same way if 
it is not specified (i.e. by passing null to backend kafka api) 


> Update Admin::describeConfigs to allow fetching specific configurations
> ---
>
> Key: KAFKA-13517
> URL: https://issues.apache.org/jira/browse/KAFKA-13517
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.1, 3.0.0
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
>
> A list of {{ConfigResource}} class is passed as argument to 
> {{AdminClient::describeConfigs}} api to indicate configuration of the 
> entities to fetch. The {{ConfigResource}} class is made up of two fields, 
> name and type of entity. Kafka returns *all* configurations for the entities 
> provided to the admin client api.
> This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the 
> configuration for the entities in question. In addition to name and type of 
> entity whose configuration to get, Kafka {{DescribeConfigsResource}} 
> structure also lets users provide {{ConfigurationKeys}} list, which allows 
> users to fetch only the configurations that are needed.
> However, this field isn't exposed in the {{ConfigResource}} class that is 
> used by AdminClient, so users of AdminClient have no way to ask for specific 
> configuration. The API always returns *all* configurations. Then the user of 
> the {{AdminClient::describeConfigs}} go over the returned list and filter out 
> the config keys that they ar

[jira] [Updated] (KAFKA-13517) Update Admin::describeConfigs to allow fetching specific configurations

2022-02-20 Thread Vikas Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikas Singh updated KAFKA-13517:

Summary: Update Admin::describeConfigs to allow fetching specific 
configurations  (was: Add ConfigurationKeys to ConfigResource class)

> Update Admin::describeConfigs to allow fetching specific configurations
> ---
>
> Key: KAFKA-13517
> URL: https://issues.apache.org/jira/browse/KAFKA-13517
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.1, 3.0.0
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
>
> A list of {{ConfigResource}} class is passed as argument to 
> {{AdminClient::describeConfigs}} api to indicate configuration of the 
> entities to fetch. The {{ConfigResource}} class is made up of two fields, 
> name and type of entity. Kafka returns *all* configurations for the entities 
> provided to the admin client api.
> This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the 
> configuration for the entities in question. In addition to name and type of 
> entity whose configuration to get, Kafka {{DescribeConfigsResource}} 
> structure also lets users provide {{ConfigurationKeys}} list, which allows 
> users to fetch only the configurations that are needed.
> However, this field isn't exposed in the {{ConfigResource}} class that is 
> used by AdminClient, so users of AdminClient have no way to ask for specific 
> configuration. The API always returns *all* configurations. Then the user of 
> the {{AdminClient::describeConfigs}} go over the returned list and filter out 
> the config keys that they are interested in.
> This results in boilerplate code for all users of 
> {{AdminClient::describeConfigs}} api, in addition to  being wasteful use of 
> resource. It becomes painful in large cluster case where to fetch one 
> configuration of all topics, we need to fetch all configuration of all 
> topics, which can be huge in size. 
> Creating this Jira to add same field (i.e. {{{}ConfigurationKeys{}}}) to the 
> {{ConfigResource}} structure to bring it to parity to 
> {{DescribeConfigsResource}} Kafka API structure. There should be no backward 
> compatibility issue as the field will be optional and will behave same way if 
> it is not specified (i.e. by passing null to backend kafka api) 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #10259: MINOR: Provide valid examples in README page.

2022-02-20 Thread GitBox


showuon commented on pull request #10259:
URL: https://github.com/apache/kafka/pull/10259#issuecomment-1046526614


   @kamalcph , thanks for the patch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #10259: MINOR: Provide valid examples in README page.

2022-02-20 Thread GitBox


showuon merged pull request #10259:
URL: https://github.com/apache/kafka/pull/10259


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10259: MINOR: Provide valid examples in README page.

2022-02-20 Thread GitBox


showuon commented on a change in pull request #10259:
URL: https://github.com/apache/kafka/pull/10259#discussion_r810818791



##
File path: tests/README.md
##
@@ -97,26 +97,26 @@ Examining CI run
 
 * Set BUILD_ID is travis ci's build id. E.g. build id is 169519874 for the 
following build
 ```bash
-https://travis-ci.org/apache/kafka/builds/169519874

Review comment:
   Agree to remove the unused travis CI doc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on pull request #10259: MINOR: Provide valid examples in README page.

2022-02-20 Thread GitBox


kamalcph commented on pull request #10259:
URL: https://github.com/apache/kafka/pull/10259#issuecomment-1046512916


   @ijuma @showuon 
   Please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12399) Deprecate Log4J Appender

2022-02-20 Thread Muthuveerappan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495330#comment-17495330
 ] 

Muthuveerappan commented on KAFKA-12399:


Just got to know -  Scheduled for April as per 
[https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.2.0]

> Deprecate Log4J Appender
> 
>
> Key: KAFKA-12399
> URL: https://issues.apache.org/jira/browse/KAFKA-12399
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 
> dependency from the classpath by removing dependencies on log4j-appender.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-20 Thread Muthuveerappan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495329#comment-17495329
 ] 

Muthuveerappan commented on KAFKA-9366:
---

Thanks :)

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

2022-02-20 Thread GitBox


showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810748411



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() {
 assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
 }
 
+@Test
+public void testAssignWithStandbyReplicasBalanceSparse() {
+builder.addSource(null, "source1", null, null, null, "topic1");
+builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source1");
+builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), 
"processor");
+
+final List topics = asList("topic1");
+
+createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+adminClient = 
createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+singletonList(APPLICATION_ID + "-store1-changelog"),
+singletonList(3))
+);
+
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 1));
+
+final List client1Consumers = asList("consumer10", 
"consumer11", "consumer12", "consumer13");
+final List client2Consumers = asList("consumer20", 
"consumer21", "consumer22");
+
+for (final String consumerId : client1Consumers) {
+subscriptions.put(consumerId,
+new Subscription(
+topics,
+getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+}
+for (final String consumerId : client2Consumers) {
+subscriptions.put(consumerId,
+new Subscription(
+topics,
+getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+}
+
+final Map assignments =
+partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+// Consumers
+final AssignmentInfo info10 = 
AssignmentInfo.decode(assignments.get("consumer10").userData());
+final AssignmentInfo info11 = 
AssignmentInfo.decode(assignments.get("consumer11").userData());
+final AssignmentInfo info12 = 
AssignmentInfo.decode(assignments.get("consumer12").userData());
+final AssignmentInfo info13 = 
AssignmentInfo.decode(assignments.get("consumer13").userData());
+final AssignmentInfo info20 = 
AssignmentInfo.decode(assignments.get("consumer20").userData());
+final AssignmentInfo info21 = 
AssignmentInfo.decode(assignments.get("consumer21").userData());
+final AssignmentInfo info22 = 
AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+// Check each consumer has no more than 1 task
+assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() 
<= 1);
+assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() 
<= 1);
+assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() 
<= 1);
+assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() 
<= 1);
+assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() 
<= 1);
+assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() 
<= 1);
+assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() 
<= 1);
+}
+
+@Test
+public void testAssignWithStandbyReplicasBalanceDense() {
+builder.addSource(null, "source1", null, null, null, "topic1");
+builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source1");
+builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), 
"processor");
+
+final List topics = asList("topic1");
+
+createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+adminClient = 
createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+singletonList(APPLICATION_ID + "-store1-changelog"),
+singletonList(3))
+);
+
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 1));
+
+subscriptions.put("consumer10",
+new Subscription(
+topics,
+getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+subscriptions.put("consumer20",
+new Subscription(
+topics,
+getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+
+final Map assignments =
+partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+// Consumers
+final AssignmentInfo info10 = 
AssignmentInfo.decode(assignments.get("consumer10").userData());
+final As

[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

2022-02-20 Thread GitBox


showuon commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1046440232


   @cadonna , do you want to have a second pairs of eyes to look at the PR? 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

2022-02-20 Thread GitBox


showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810748411



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() {
 assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
 }
 
+@Test
+public void testAssignWithStandbyReplicasBalanceSparse() {
+builder.addSource(null, "source1", null, null, null, "topic1");
+builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source1");
+builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), 
"processor");
+
+final List topics = asList("topic1");
+
+createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+adminClient = 
createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+singletonList(APPLICATION_ID + "-store1-changelog"),
+singletonList(3))
+);
+
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 1));
+
+final List client1Consumers = asList("consumer10", 
"consumer11", "consumer12", "consumer13");
+final List client2Consumers = asList("consumer20", 
"consumer21", "consumer22");
+
+for (final String consumerId : client1Consumers) {
+subscriptions.put(consumerId,
+new Subscription(
+topics,
+getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+}
+for (final String consumerId : client2Consumers) {
+subscriptions.put(consumerId,
+new Subscription(
+topics,
+getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+}
+
+final Map assignments =
+partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+// Consumers
+final AssignmentInfo info10 = 
AssignmentInfo.decode(assignments.get("consumer10").userData());
+final AssignmentInfo info11 = 
AssignmentInfo.decode(assignments.get("consumer11").userData());
+final AssignmentInfo info12 = 
AssignmentInfo.decode(assignments.get("consumer12").userData());
+final AssignmentInfo info13 = 
AssignmentInfo.decode(assignments.get("consumer13").userData());
+final AssignmentInfo info20 = 
AssignmentInfo.decode(assignments.get("consumer20").userData());
+final AssignmentInfo info21 = 
AssignmentInfo.decode(assignments.get("consumer21").userData());
+final AssignmentInfo info22 = 
AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+// Check each consumer has no more than 1 task
+assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() 
<= 1);
+assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() 
<= 1);
+assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() 
<= 1);
+assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() 
<= 1);
+assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() 
<= 1);
+assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() 
<= 1);
+assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() 
<= 1);
+}
+
+@Test
+public void testAssignWithStandbyReplicasBalanceDense() {
+builder.addSource(null, "source1", null, null, null, "topic1");
+builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source1");
+builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), 
"processor");
+
+final List topics = asList("topic1");
+
+createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+adminClient = 
createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+singletonList(APPLICATION_ID + "-store1-changelog"),
+singletonList(3))
+);
+
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 1));
+
+subscriptions.put("consumer10",
+new Subscription(
+topics,
+getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+subscriptions.put("consumer20",
+new Subscription(
+topics,
+getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, 
USER_END_POINT).encode()));
+
+final Map assignments =
+partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+// Consumers
+final AssignmentInfo info10 = 
AssignmentInfo.decode(assignments.get("consumer10").userData());
+final As

[GitHub] [kafka] showuon commented on a change in pull request #11794: MINOR: Add links to connector configs in TOC

2022-02-20 Thread GitBox


showuon commented on a change in pull request #11794:
URL: https://github.com/apache/kafka/pull/11794#discussion_r810744353



##
File path: docs/toc.html
##
@@ -45,6 +45,10 @@
 3.3 Producer Configs
 3.4 Consumer Configs
 3.5 Kafka Connect Configs
+
+Source Connector 
Configs
+Sink Connector 
Configs
+

Review comment:
   It's good to have these 2 sub categories in TOC. But usually with this 
TOC, we will think there are only 2 categories under `3.5 Kafka Connect 
Configs`. However, there is another sub-category `Kafka Connect Configs` there. 
Do you think we can make the TOC as below:
   - 3.5 Kafka Connect Configs
  - Kafka Connect Configs
  - Source Connector Configs
  - Sink Connector Configs
   
   And maybe change `3.5.1` and `3.5.2` in front of `Source Connector Configs` 
and `Sink Connector Configs`. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ruanwenjun commented on a change in pull request #11754: MINOR: Optimize collection method in Utils

2022-02-20 Thread GitBox


ruanwenjun commented on a change in pull request #11754:
URL: https://github.com/apache/kafka/pull/11754#discussion_r810732547



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -764,10 +764,7 @@ public static ByteBuffer ensureCapacity(ByteBuffer 
existingBuffer, int newLength
  */
 @SafeVarargs
 public static  Set mkSet(T... elems) {
-Set result = new HashSet<>((int) (elems.length / 0.75) + 1);
-for (T elem : elems)
-result.add(elem);
-return result;
+return new HashSet<>(Arrays.asList(elems));

Review comment:
   @splett2 Yes, agree with you, the method `mkSet` is just used in some 
test code or static variable. This pr is not made due to the performance, it's 
just hoped to improve readability.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tim-patterson commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

2022-02-20 Thread GitBox


tim-patterson commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r810711885



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map> tasksToC
 final int movementsNeeded = taskMovements.size();
 
 for (final TaskMovement movement : taskMovements) {
-final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+// Attempt to find a caught up standby, otherwise find any caught 
up client, failing that use the most
+// caught up client.
+UUID sourceClient = caughtUpClientsByTaskLoad.poll(
 movement.task,
 c -> clientStates.get(c).hasStandbyTask(movement.task)
 );
-if (standbySourceClient == null) {
-// there's not a caught-up standby available to take over the 
task, so we'll schedule a warmup instead
-final UUID sourceClient = requireNonNull(
-caughtUpClientsByTaskLoad.poll(movement.task),
-"Tried to move task to caught-up client but none exist"
+
+if (sourceClient == null) {
+sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+}
+
+if (sourceClient == null) {
+sourceClient = requireNonNull(
+mostCaughtUpEligibleClient(tasksToClientByLag, 
movement.task, movement.destination),
+"Tried to move task to more caught-up client but none 
exist"
 );
+}
 
+if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) 
{
+// there's not a standby available to take over the task, so 
we'll schedule a warmup instead

Review comment:
   I've done it with the nested if/else's but stopped reusing the same 
sourceClient var to try make it a little clearer.
   Let me know what you think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #11794: MINOR: Add links to connector configs in TOC

2022-02-20 Thread GitBox


mimaison opened a new pull request #11794:
URL: https://github.com/apache/kafka/pull/11794


   Add links to Sink and Source connectors configurations
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tim-patterson commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

2022-02-20 Thread GitBox


tim-patterson commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810597642



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final 
Set statefulTasks,
 
 /**
  * Generate an assignment that tries to preserve thread-level stickiness 
of stateful tasks without violating
- * balance. The stateful and total task load are both balanced across 
threads. Tasks without previous owners
- * will be interleaved by group id to spread subtopologies across threads 
and further balance the workload.
+ * balance. The tasks are balanced across threads. Tasks without previous 
owners will be interleaved by
+ * group id to spread subtopologies across threads and further balance the 
workload.
+ * threadLoad is a map that keeps track of task load per thread across 
multiple calls so actives and standbys
+ * are evenly distributed
  */
-static Map> assignTasksToThreads(final 
Collection statefulTasksToAssign,
-  final 
Collection statelessTasksToAssign,
-  final 
SortedSet consumers,
-  final ClientState 
state) {
+static Map> assignStatefulTasksToThreads(final 
Collection tasksToAssign,
+  final 
SortedSet consumers,
+  final 
ClientState state,
+  final 
Map threadLoad) {
 final Map> assignment = new HashMap<>();
 for (final String consumer : consumers) {
 assignment.put(consumer, new ArrayList<>());
 }
 
-final List unassignedStatelessTasks = new 
ArrayList<>(statelessTasksToAssign);
-Collections.sort(unassignedStatelessTasks);
-
-final Iterator unassignedStatelessTasksIter = 
unassignedStatelessTasks.iterator();
+int totalTasks = tasksToAssign.size();
+for (final Integer threadTaskCount : threadLoad.values()) {
+totalTasks += threadTaskCount;
+}
 
-final int minStatefulTasksPerThread = (int) Math.floor(((double) 
statefulTasksToAssign.size()) / consumers.size());
-final PriorityQueue unassignedStatefulTasks = new 
PriorityQueue<>(statefulTasksToAssign);
+final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / 
consumers.size());
+final PriorityQueue unassignedTasks = new 
PriorityQueue<>(tasksToAssign);
 
 final Queue consumersToFill = new LinkedList<>();
 // keep track of tasks that we have to skip during the first pass in 
case we can reassign them later
 // using tree-map to make sure the iteration ordering over keys are 
preserved
 final Map unassignedTaskToPreviousOwner = new 
TreeMap<>();
 
-if (!unassignedStatefulTasks.isEmpty()) {
-// First assign stateful tasks to previous owner, up to the min 
expected tasks/thread
+if (!unassignedTasks.isEmpty()) {
+// First assign tasks to previous owner, up to the min expected 
tasks/thread
 for (final String consumer : consumers) {
 final List threadAssignment = assignment.get(consumer);
 
 for (final TaskId task : state.prevTasksByLag(consumer)) {
-if (unassignedStatefulTasks.contains(task)) {
-if (threadAssignment.size() < 
minStatefulTasksPerThread) {
+if (unassignedTasks.contains(task)) {
+final int threadTaskCount = threadAssignment.size() + 
threadLoad.getOrDefault(consumer, 0);
+if (threadTaskCount < minTasksPerThread) {
 threadAssignment.add(task);
-unassignedStatefulTasks.remove(task);
+unassignedTasks.remove(task);
 } else {
 unassignedTaskToPreviousOwner.put(task, consumer);
 }
 }
 }
 
-if (threadAssignment.size() < minStatefulTasksPerThread) {
+final int threadTaskCount = threadAssignment.size() + 
threadLoad.getOrDefault(consumer, 0);
+if (threadTaskCount < minTasksPerThread) {
 consumersToFill.offer(consumer);
 }
 }
 
 // Next interleave remaining unassigned tasks amongst unfilled 
consumers
 while (!consumersToFill.isEmpty()) {
-final TaskId task = unassignedStatefulTasks.poll();
+final TaskId task = unassignedTasks.poll();
 if (task != null) {

[GitHub] [kafka] hongyiz6639 opened a new pull request #11793: MINOR: Skip fsync on parent directory to start Kafka on ZOS

2022-02-20 Thread GitBox


hongyiz6639 opened a new pull request #11793:
URL: https://github.com/apache/kafka/pull/11793


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org