[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r810857967 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java ## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ClientTagAwareStandbyTaskAssignorTest { +private static final String ZONE_TAG = "zone"; +private static final String CLUSTER_TAG = "cluster"; + +private static final String ZONE_1 = "zone1"; +private static final String ZONE_2 = "zone2"; +private static final String ZONE_3 = "zone3"; + +private static final String CLUSTER_1 = "cluster1"; +private static final String CLUSTER_2 = "cluster2"; +private static final String CLUSTER_3 = "cluster3"; + +private static final UUID UUID_1 = uuidForInt(1); +private static final UUID UUID_2 = uuidForInt(2); +private static final UUID UUID_3 = uuidForInt(3); +private static final UUID UUID_4 = uuidForInt(4); +private static final UUID UUID_5 = uuidForInt(5); +private static final UUID UUID_6 = uuidForInt(6); +private static final UUID UUID_7 = uuidForInt(7); +private static final UUID UUID_8 = uuidForInt(8); +private static final UUID UUID_9 = uuidForInt(9); + +@Test +public void shouldPermitTaskMovementWhenClientTagsMatch() { +final ClientTagAwareStandbyTaskAssignor standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +final ClientState source = createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1))); +final ClientState destination = createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1))); + +assertTrue(standbyTaskAssignor.isAllowedTaskMovement(source, destination)); +} + +@Test +public void shouldDeclineTaskMovementWhenClientTagsDoNotMatch() { +final ClientTagAwareStandbyTaskAssignor standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +final ClientState source = createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1))); +final ClientState destination = createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1))); + +assertFalse(standbyTaskAssignor.isAllowedTaskMovement(source, destination)); +} + +@Test +public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntr
[jira] [Updated] (KAFKA-13517) Update Admin::describeConfigs to allow fetching specific configurations
[ https://issues.apache.org/jira/browse/KAFKA-13517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh updated KAFKA-13517: Description: A list of {{ConfigResource}} class is passed as argument to {{AdminClient::describeConfigs}} api to indicate configuration of the entities to fetch. The {{ConfigResource}} class is made up of two fields, name and type of entity. Kafka returns *all* configurations for the entities provided to the admin client api. This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the configuration for the entities in question. In addition to name and type of entity whose configuration to get, Kafka {{DescribeConfigsResource}} structure also lets users provide {{ConfigurationKeys}} list, which allows users to fetch only the configurations that are needed. However, this field isn't exposed in the {{ConfigResource}} class that is used by AdminClient, so users of AdminClient have no way to ask for specific configuration. The API always returns *all* configurations. Then the user of the {{AdminClient::describeConfigs}} go over the returned list and filter out the config keys that they are interested in. This results in boilerplate code for all users of {{AdminClient::describeConfigs}} api, in addition to being wasteful use of resource. It becomes painful in large cluster case where to fetch one configuration of all topics, we need to fetch all configuration of all topics, which can be huge in size. Creating this Jira to track changes proposed in KIP-823: https://confluentinc.atlassian.net/wiki/spaces/~953215099/pages/2707357918/KIP+Admin+describeConfigs+should+allow+fetching+specific+configurations was: A list of {{ConfigResource}} class is passed as argument to {{AdminClient::describeConfigs}} api to indicate configuration of the entities to fetch. The {{ConfigResource}} class is made up of two fields, name and type of entity. Kafka returns *all* configurations for the entities provided to the admin client api. This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the configuration for the entities in question. In addition to name and type of entity whose configuration to get, Kafka {{DescribeConfigsResource}} structure also lets users provide {{ConfigurationKeys}} list, which allows users to fetch only the configurations that are needed. However, this field isn't exposed in the {{ConfigResource}} class that is used by AdminClient, so users of AdminClient have no way to ask for specific configuration. The API always returns *all* configurations. Then the user of the {{AdminClient::describeConfigs}} go over the returned list and filter out the config keys that they are interested in. This results in boilerplate code for all users of {{AdminClient::describeConfigs}} api, in addition to being wasteful use of resource. It becomes painful in large cluster case where to fetch one configuration of all topics, we need to fetch all configuration of all topics, which can be huge in size. Creating this Jira to add same field (i.e. {{{}ConfigurationKeys{}}}) to the {{ConfigResource}} structure to bring it to parity to {{DescribeConfigsResource}} Kafka API structure. There should be no backward compatibility issue as the field will be optional and will behave same way if it is not specified (i.e. by passing null to backend kafka api) > Update Admin::describeConfigs to allow fetching specific configurations > --- > > Key: KAFKA-13517 > URL: https://issues.apache.org/jira/browse/KAFKA-13517 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.1, 3.0.0 >Reporter: Vikas Singh >Assignee: Vikas Singh >Priority: Major > > A list of {{ConfigResource}} class is passed as argument to > {{AdminClient::describeConfigs}} api to indicate configuration of the > entities to fetch. The {{ConfigResource}} class is made up of two fields, > name and type of entity. Kafka returns *all* configurations for the entities > provided to the admin client api. > This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the > configuration for the entities in question. In addition to name and type of > entity whose configuration to get, Kafka {{DescribeConfigsResource}} > structure also lets users provide {{ConfigurationKeys}} list, which allows > users to fetch only the configurations that are needed. > However, this field isn't exposed in the {{ConfigResource}} class that is > used by AdminClient, so users of AdminClient have no way to ask for specific > configuration. The API always returns *all* configurations. Then the user of > the {{AdminClient::describeConfigs}} go over the returned list and filter out > the config keys that they ar
[jira] [Updated] (KAFKA-13517) Update Admin::describeConfigs to allow fetching specific configurations
[ https://issues.apache.org/jira/browse/KAFKA-13517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh updated KAFKA-13517: Summary: Update Admin::describeConfigs to allow fetching specific configurations (was: Add ConfigurationKeys to ConfigResource class) > Update Admin::describeConfigs to allow fetching specific configurations > --- > > Key: KAFKA-13517 > URL: https://issues.apache.org/jira/browse/KAFKA-13517 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.1, 3.0.0 >Reporter: Vikas Singh >Assignee: Vikas Singh >Priority: Major > > A list of {{ConfigResource}} class is passed as argument to > {{AdminClient::describeConfigs}} api to indicate configuration of the > entities to fetch. The {{ConfigResource}} class is made up of two fields, > name and type of entity. Kafka returns *all* configurations for the entities > provided to the admin client api. > This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the > configuration for the entities in question. In addition to name and type of > entity whose configuration to get, Kafka {{DescribeConfigsResource}} > structure also lets users provide {{ConfigurationKeys}} list, which allows > users to fetch only the configurations that are needed. > However, this field isn't exposed in the {{ConfigResource}} class that is > used by AdminClient, so users of AdminClient have no way to ask for specific > configuration. The API always returns *all* configurations. Then the user of > the {{AdminClient::describeConfigs}} go over the returned list and filter out > the config keys that they are interested in. > This results in boilerplate code for all users of > {{AdminClient::describeConfigs}} api, in addition to being wasteful use of > resource. It becomes painful in large cluster case where to fetch one > configuration of all topics, we need to fetch all configuration of all > topics, which can be huge in size. > Creating this Jira to add same field (i.e. {{{}ConfigurationKeys{}}}) to the > {{ConfigResource}} structure to bring it to parity to > {{DescribeConfigsResource}} Kafka API structure. There should be no backward > compatibility issue as the field will be optional and will behave same way if > it is not specified (i.e. by passing null to backend kafka api) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on pull request #10259: MINOR: Provide valid examples in README page.
showuon commented on pull request #10259: URL: https://github.com/apache/kafka/pull/10259#issuecomment-1046526614 @kamalcph , thanks for the patch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #10259: MINOR: Provide valid examples in README page.
showuon merged pull request #10259: URL: https://github.com/apache/kafka/pull/10259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10259: MINOR: Provide valid examples in README page.
showuon commented on a change in pull request #10259: URL: https://github.com/apache/kafka/pull/10259#discussion_r810818791 ## File path: tests/README.md ## @@ -97,26 +97,26 @@ Examining CI run * Set BUILD_ID is travis ci's build id. E.g. build id is 169519874 for the following build ```bash -https://travis-ci.org/apache/kafka/builds/169519874 Review comment: Agree to remove the unused travis CI doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #10259: MINOR: Provide valid examples in README page.
kamalcph commented on pull request #10259: URL: https://github.com/apache/kafka/pull/10259#issuecomment-1046512916 @ijuma @showuon Please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12399) Deprecate Log4J Appender
[ https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495330#comment-17495330 ] Muthuveerappan commented on KAFKA-12399: Just got to know - Scheduled for April as per [https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.2.0] > Deprecate Log4J Appender > > > Key: KAFKA-12399 > URL: https://issues.apache.org/jira/browse/KAFKA-12399 > Project: Kafka > Issue Type: Improvement > Components: logging >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Labels: needs-kip > Fix For: 3.2.0 > > > As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 > dependency from the classpath by removing dependencies on log4j-appender. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495329#comment-17495329 ] Muthuveerappan commented on KAFKA-9366: --- Thanks :) > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks
showuon commented on a change in pull request #11493: URL: https://github.com/apache/kafka/pull/11493#discussion_r810748411 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() { assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost()); } +@Test +public void testAssignWithStandbyReplicasBalanceSparse() { +builder.addSource(null, "source1", null, null, null, "topic1"); +builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); +builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + +final List topics = asList("topic1"); + +createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); +adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( +singletonList(APPLICATION_ID + "-store1-changelog"), +singletonList(3)) +); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + +final List client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13"); +final List client2Consumers = asList("consumer20", "consumer21", "consumer22"); + +for (final String consumerId : client1Consumers) { +subscriptions.put(consumerId, +new Subscription( +topics, +getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); +} +for (final String consumerId : client2Consumers) { +subscriptions.put(consumerId, +new Subscription( +topics, +getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); +} + +final Map assignments = +partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + +// Consumers +final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); +final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); +final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData()); +final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData()); +final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); +final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); +final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData()); + +// Check each consumer has no more than 1 task +assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1); +assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1); +assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1); +assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1); +assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1); +assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1); +assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1); +} + +@Test +public void testAssignWithStandbyReplicasBalanceDense() { +builder.addSource(null, "source1", null, null, null, "topic1"); +builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); +builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + +final List topics = asList("topic1"); + +createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); +adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( +singletonList(APPLICATION_ID + "-store1-changelog"), +singletonList(3)) +); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + +subscriptions.put("consumer10", +new Subscription( +topics, +getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); +subscriptions.put("consumer20", +new Subscription( +topics, +getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + +final Map assignments = +partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + +// Consumers +final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); +final As
[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks
showuon commented on pull request #11493: URL: https://github.com/apache/kafka/pull/11493#issuecomment-1046440232 @cadonna , do you want to have a second pairs of eyes to look at the PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks
showuon commented on a change in pull request #11493: URL: https://github.com/apache/kafka/pull/11493#discussion_r810748411 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() { assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost()); } +@Test +public void testAssignWithStandbyReplicasBalanceSparse() { +builder.addSource(null, "source1", null, null, null, "topic1"); +builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); +builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + +final List topics = asList("topic1"); + +createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); +adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( +singletonList(APPLICATION_ID + "-store1-changelog"), +singletonList(3)) +); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + +final List client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13"); +final List client2Consumers = asList("consumer20", "consumer21", "consumer22"); + +for (final String consumerId : client1Consumers) { +subscriptions.put(consumerId, +new Subscription( +topics, +getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); +} +for (final String consumerId : client2Consumers) { +subscriptions.put(consumerId, +new Subscription( +topics, +getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); +} + +final Map assignments = +partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + +// Consumers +final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); +final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); +final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData()); +final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData()); +final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); +final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); +final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData()); + +// Check each consumer has no more than 1 task +assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1); +assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1); +assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1); +assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1); +assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1); +assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1); +assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1); +} + +@Test +public void testAssignWithStandbyReplicasBalanceDense() { +builder.addSource(null, "source1", null, null, null, "topic1"); +builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); +builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + +final List topics = asList("topic1"); + +createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); +adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( +singletonList(APPLICATION_ID + "-store1-changelog"), +singletonList(3)) +); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + +subscriptions.put("consumer10", +new Subscription( +topics, +getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); +subscriptions.put("consumer20", +new Subscription( +topics, +getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + +final Map assignments = +partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + +// Consumers +final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); +final As
[GitHub] [kafka] showuon commented on a change in pull request #11794: MINOR: Add links to connector configs in TOC
showuon commented on a change in pull request #11794: URL: https://github.com/apache/kafka/pull/11794#discussion_r810744353 ## File path: docs/toc.html ## @@ -45,6 +45,10 @@ 3.3 Producer Configs 3.4 Consumer Configs 3.5 Kafka Connect Configs + +Source Connector Configs +Sink Connector Configs + Review comment: It's good to have these 2 sub categories in TOC. But usually with this TOC, we will think there are only 2 categories under `3.5 Kafka Connect Configs`. However, there is another sub-category `Kafka Connect Configs` there. Do you think we can make the TOC as below: - 3.5 Kafka Connect Configs - Kafka Connect Configs - Source Connector Configs - Sink Connector Configs And maybe change `3.5.1` and `3.5.2` in front of `Source Connector Configs` and `Sink Connector Configs`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ruanwenjun commented on a change in pull request #11754: MINOR: Optimize collection method in Utils
ruanwenjun commented on a change in pull request #11754: URL: https://github.com/apache/kafka/pull/11754#discussion_r810732547 ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -764,10 +764,7 @@ public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength */ @SafeVarargs public static Set mkSet(T... elems) { -Set result = new HashSet<>((int) (elems.length / 0.75) + 1); -for (T elem : elems) -result.add(elem); -return result; +return new HashSet<>(Arrays.asList(elems)); Review comment: @splett2 Yes, agree with you, the method `mkSet` is just used in some test code or static variable. This pr is not made due to the performance, it's just hoped to improve readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tim-patterson commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist
tim-patterson commented on a change in pull request #11760: URL: https://github.com/apache/kafka/pull/11760#discussion_r810711885 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map> tasksToC final int movementsNeeded = taskMovements.size(); for (final TaskMovement movement : taskMovements) { -final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll( +// Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most +// caught up client. +UUID sourceClient = caughtUpClientsByTaskLoad.poll( movement.task, c -> clientStates.get(c).hasStandbyTask(movement.task) ); -if (standbySourceClient == null) { -// there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead -final UUID sourceClient = requireNonNull( -caughtUpClientsByTaskLoad.poll(movement.task), -"Tried to move task to caught-up client but none exist" + +if (sourceClient == null) { +sourceClient = caughtUpClientsByTaskLoad.poll(movement.task); +} + +if (sourceClient == null) { +sourceClient = requireNonNull( +mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination), +"Tried to move task to more caught-up client but none exist" ); +} +if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) { +// there's not a standby available to take over the task, so we'll schedule a warmup instead Review comment: I've done it with the nested if/else's but stopped reusing the same sourceClient var to try make it a little clearer. Let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request #11794: MINOR: Add links to connector configs in TOC
mimaison opened a new pull request #11794: URL: https://github.com/apache/kafka/pull/11794 Add links to Sink and Source connectors configurations ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tim-patterson commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks
tim-patterson commented on a change in pull request #11493: URL: https://github.com/apache/kafka/pull/11493#discussion_r810597642 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set statefulTasks, /** * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating - * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners - * will be interleaved by group id to spread subtopologies across threads and further balance the workload. + * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by + * group id to spread subtopologies across threads and further balance the workload. + * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys + * are evenly distributed */ -static Map> assignTasksToThreads(final Collection statefulTasksToAssign, - final Collection statelessTasksToAssign, - final SortedSet consumers, - final ClientState state) { +static Map> assignStatefulTasksToThreads(final Collection tasksToAssign, + final SortedSet consumers, + final ClientState state, + final Map threadLoad) { final Map> assignment = new HashMap<>(); for (final String consumer : consumers) { assignment.put(consumer, new ArrayList<>()); } -final List unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign); -Collections.sort(unassignedStatelessTasks); - -final Iterator unassignedStatelessTasksIter = unassignedStatelessTasks.iterator(); +int totalTasks = tasksToAssign.size(); +for (final Integer threadTaskCount : threadLoad.values()) { +totalTasks += threadTaskCount; +} -final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size()); -final PriorityQueue unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign); +final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size()); +final PriorityQueue unassignedTasks = new PriorityQueue<>(tasksToAssign); final Queue consumersToFill = new LinkedList<>(); // keep track of tasks that we have to skip during the first pass in case we can reassign them later // using tree-map to make sure the iteration ordering over keys are preserved final Map unassignedTaskToPreviousOwner = new TreeMap<>(); -if (!unassignedStatefulTasks.isEmpty()) { -// First assign stateful tasks to previous owner, up to the min expected tasks/thread +if (!unassignedTasks.isEmpty()) { +// First assign tasks to previous owner, up to the min expected tasks/thread for (final String consumer : consumers) { final List threadAssignment = assignment.get(consumer); for (final TaskId task : state.prevTasksByLag(consumer)) { -if (unassignedStatefulTasks.contains(task)) { -if (threadAssignment.size() < minStatefulTasksPerThread) { +if (unassignedTasks.contains(task)) { +final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); +if (threadTaskCount < minTasksPerThread) { threadAssignment.add(task); -unassignedStatefulTasks.remove(task); +unassignedTasks.remove(task); } else { unassignedTaskToPreviousOwner.put(task, consumer); } } } -if (threadAssignment.size() < minStatefulTasksPerThread) { +final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); +if (threadTaskCount < minTasksPerThread) { consumersToFill.offer(consumer); } } // Next interleave remaining unassigned tasks amongst unfilled consumers while (!consumersToFill.isEmpty()) { -final TaskId task = unassignedStatefulTasks.poll(); +final TaskId task = unassignedTasks.poll(); if (task != null) {
[GitHub] [kafka] hongyiz6639 opened a new pull request #11793: MINOR: Skip fsync on parent directory to start Kafka on ZOS
hongyiz6639 opened a new pull request #11793: URL: https://github.com/apache/kafka/pull/11793 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org