Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
ijuma commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1817732550 @cmccabe Please check all the Java/Scala versions before merging, this PR clearly broke the Java 8/Scala 2.12 build in multiple ways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
dajac commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1816721578 I've also seen `Build / JDK 8 and Scala 2.12 / testAssignmentAggregation() – kafka.server.AssignmentsManagerTest` failing consistently in a few builds ([example](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2392/tests)). @soarez Could you please double check this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
dajac commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1815873092 It seems that a compilation error was introduced by this PR. From the last "JDK 8 and Scala 2.12" build: ``` [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-14369/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala:157:36: type mismatch; found : Seq[String] (in scala.collection) required: Seq[String] (in scala.collection.immutable) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1815535748 commited, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe closed pull request #14369: KAFKA-15357: Aggregate and propagate assignments URL: https://github.com/apache/kafka/pull/14369 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1815518652 > @cmccabe I'm confused, I thought that was the whole point of kafka.utils.Exit https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L21-L24 https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java > > The FaultHandler concept in QuorumController seems great, but I'm not sure how it fits here: afaict the fault is always fatal and we can intercept it in tests - what am I missing? Exit has two very big flaws: 1. it uses static variables, so if you run tests in parallel, you can hit issues. 2. you have to remember to set it. most tests leave Exit's default behavior intact (which is exiting the process) Anyway we can talk more about this later. We don't have to convert to using it in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1814927398 @cmccabe @pprovenzano PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1810719729 > avoids calling exit(1) in junit tests, which will kill Jenkins dead (even after 3 decades of Java, we don't have the technology to intercept exit() in unit testrs >:( ) @cmccabe I'm confused, I thought that was the whole point of `kafka.utils.Exit` https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L21-L24 https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java The `FaultHandler` concept in `QuorumController` seems great, but I'm not sure how it fits here: afaict the fault is always fatal and we can intercept it in tests - what am I missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
OmniaGM commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1392653351 ## core/src/test/java/kafka/server/AssignmentsManagerTest.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.server.common.TopicIdPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class AssignmentsManagerTest { + +private static final Uuid TOPIC_1 = Uuid.fromString("88rnFIqYSZykX4ZSKv81bg"); +private static final Uuid TOPIC_2 = Uuid.fromString("VKCnzHdhR5uDQc1shqBYrQ"); +private static final Uuid DIR_1 = Uuid.fromString("cbgD8WdLQCyzLrFIMBhv3w"); +private static final Uuid DIR_2 = Uuid.fromString("zO0bDc0vSuam7Db9iH7rYQ"); +private static final Uuid DIR_3 = Uuid.fromString("CGBWbrFkRkeJQy6Aryzq2Q"); + +private MockTime time; +private NodeToControllerChannelManager channelManager; +private AssignmentsManager manager; + +@BeforeEach +public void setup() { +time = new MockTime(); +channelManager = mock(NodeToControllerChannelManager.class); +manager = new AssignmentsManager(time, channelManager, 8, () -> 100L); +} + +@AfterEach +void tearDown() throws InterruptedException { +manager.close(); +} + +@Test +void testBuildRequestData() { +Map assignment = new HashMap() {{ +put(new TopicIdPartition(TOPIC_1, 1), DIR_1); +put(new TopicIdPartition(TOPIC_1, 2), DIR_2); +put(new TopicIdPartition(TOPIC_1, 3), DIR_3); +put(new TopicIdPartition(TOPIC_1, 4), DIR_1); +put(new TopicIdPartition(TOPIC_2, 5), DIR_2); +}}; +AssignReplicasToDirsRequestData built = AssignmentsManager.buildRequestData(8, 100L, assignment); +AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData() +.setBrokerId(8) +.setBrokerEpoch(100L) +.setDirectories(Arrays.asList( +new AssignReplicasToDirsRequestData.DirectoryData() +.setId(DIR_2) +.setTopics(Arrays.asList( +new AssignReplicasToDirsRequestData.TopicData() +.setTopicId(TOPIC_1) + .setPartitions(Collections.singletonList( +new AssignReplicasToDirsRequestData.PartitionData() + .setPartitionIndex(2) +)), +new AssignReplicasToDirsRequestData.TopicData() +.setTopicId(TOPIC_2) + .setPartitions(Coll
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1392636901 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); Review Comment: Maybe this syntax looks a bit deceiving. `TimeUnit.SECONDS.toNanos(1)` returns `10`. This is 1 second, not 1 nano. Does that seem reasonable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
OmniaGM commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1810263837 I agree with @cmccabe regarding passing a reference for `AssignmentHandler` to `ReplicaManager`. Maybe one other note, many integration tests are failing now with the following error ``` Error applying topics delta in MetadataDelta up to 78: Assignment into unidentified directory: /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14369/tools/data/kafka-2762391009500302231 at app//kafka.server.ReplicaManager.maybeNotifyPartitionAssignedToDirectory(ReplicaManager.scala:2362) ``` for example https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14369/11/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testElectUncleanLeadersNoop_String__quorum_kraft/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1392624170 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); Review Comment: This isn
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1392624170 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); Review Comment: This isn
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1392487870 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/**
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391783416 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -277,6 +279,24 @@ class BrokerServer( time ) + if (config.logDirs.size > 1) { Review Comment: Should we log something here, to indicate that JBOD mode is on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391782832 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/*
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391781726 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); Review Comment: Throwin
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391780510 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); Review Comment: Also a lot of CPU clocks don't go down to an individual nanosecond. I don't know the exact details (varies a lot by platform) but basically you probably can't schedule an actual 1 ns delay. Something like a 1/4 a ms would be more reasonable? Though still likely to be padded out to a longer length by the actual platform -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391779074 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); Review Comment: I don't think 1 nanosecond is going to give you much batching! At 4 GHz, 1 nanosecond is like 4 CPU cycles Obviously you have piplining, instruction reordering, yadda yadda, but you get the idea: this is NOT a reasonable amount of time to get anything done in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809242988 I don't know if we want to do this in this PR, but one thing I would suggest for ReplicaManager is to use the `FaultHandler` paradigm we have in the `QuorumController` code. Specifically, when `QuorumController` hits an unrecoverable condition it will invoke the `FaultHandler`. In normal operation this maps to logging an error message and exiting. In unit tests, it maps to an exception, and also setting a flag that will cause any integration test to always fail. This accomplishes two things: 1. avoids calling `exit(1)` in junit tests, which will kill Jenkins dead (even after 3 decades of Java, we don't have the technology to intercept `exit()` in unit testrs >:( ) 2. allows us to always know if something is going wrong in the unit / integration test. There can also be non-fatal fault handlers, which tend to make point 2 even more important (since many times throwing an exception or logging an ERROR will not prevent the test from succeeding!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809239029 I don't have a good place to put this comment (github only lets me comment on changed lines) but there is a problem with this code: ``` private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { override def doWork(): Unit = { val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir() if (haltBrokerOnDirFailure) { fatal(s"Halting broker because dir $newOfflineLogDir is offline") Exit.halt(1) } handleLogDirFailure(newOfflineLogDir) } } ``` If the directory that failed is the metadata directory, we need to exit unconditionally. This is because we have not implemented any way of failing over to a different directory for metadata. I suppose we should have a post-3.7 follow-up JIRA for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391771537 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2323,15 +2326,56 @@ class ReplicaManager(val config: KafkaConfig, } logManager.handleLogDirFailure(dir) -if (sendZkNotification) +if (notifyController) { + if (config.migrationEnabled) { +fatal(s"Shutdown broker because some log directory has failed during migration mode: $dir") +Exit.halt(1) Review Comment: This seems wrong as our long-term solution, but I guess it's OK for now. (We can discuss more later I guess) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391770284 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -269,7 +270,9 @@ class ReplicaManager(val config: KafkaConfig, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, threadNamePrefix: Option[String] = None, val brokerEpochSupplier: () => Long = () => -1, - addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None + addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, + assignmentManager: Option[AssignmentsManager] = None, Review Comment: I know this is a common pattern in `ReplicaManager` but it just seems so bad. We don't really care about the details of `AssignmentsManager` or `BrokerLifecycleManager` here. Shouldn't we just be passing a reference to an interface like `AssignmentHandler`? ``` interface AssignmentHandler { void onAssignment(TopicIdPartition, Uuid); void propagateDirectoryFailure(Uuid directoryId); } ``` etc. Then we can initialize a dummy version by default, to keep all the unit tests working without changes (if desired). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391763417 ## clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java: ## @@ -27,6 +27,8 @@ public class AssignReplicasToDirsRequest extends AbstractRequest { +public static final int MAX_ASSIGNMENTS_PER_REQUEST = 2250; Review Comment: Can you add JavaDoc about this? Including the part where we want to keep it under 64kb. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
pprovenzano commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1808630427 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1805892764 Is there a way to trigger the tests again or do I need to push additional changes? All the test pipelines show a Jenkins error: "No space left on device". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1389229875 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/** + * Hand
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/** + * Hand
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/** + * Hand
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/** + * Hand
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1386810622 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig, if (sendZkNotification) if (zkClient.isEmpty) { -warn("Unable to propagate log dir failure via Zookeeper in KRaft mode") +val uuid = logManager.directoryId(dir) +if (uuid.isDefined && lifecycleManager.isDefined) + lifecycleManager.get.propagateDirectoryFailure(uuid.get) Review Comment: Ah! I think that variable should get a new name. The "Zk" part of it conveys that instead of an RPC to the controller, the notification goes via ZK, but what the flag actually does is determine whether the controller should know about the failure or not – regardless of what notification mechanism is used. I'll update this PR to rename that variable. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig, if (sendZkNotification) if (zkClient.isEmpty) { -warn("Unable to propagate log dir failure via Zookeeper in KRaft mode") +val uuid = logManager.directoryId(dir) +if (uuid.isDefined && lifecycleManager.isDefined) + lifecycleManager.get.propagateDirectoryFailure(uuid.get) Review Comment: Ah! I think that variable should get a new name. The "Zk" part of it conveys that instead of an RPC to the controller, the notification goes via ZK, but what the flag actually does is determine whether the controller should know about the failure or not – regardless of what notification mechanism is used. I'll update this PR to rename that variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/** + * Hand
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
pprovenzano commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1386788917 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig, if (sendZkNotification) if (zkClient.isEmpty) { -warn("Unable to propagate log dir failure via Zookeeper in KRaft mode") +val uuid = logManager.directoryId(dir) +if (uuid.isDefined && lifecycleManager.isDefined) + lifecycleManager.get.propagateDirectoryFailure(uuid.get) Review Comment: Why would sendZkNotification be enabled in normal KRaft mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1386609951 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig, if (sendZkNotification) if (zkClient.isEmpty) { -warn("Unable to propagate log dir failure via Zookeeper in KRaft mode") +val uuid = logManager.directoryId(dir) +if (uuid.isDefined && lifecycleManager.isDefined) + lifecycleManager.get.propagateDirectoryFailure(uuid.get) Review Comment: I think I'm missing something, why can this only happen during migration? AFAICT this will be called in normal KRaft operation, as long as the line `logManager.handleLogDirFailure(dir)` above does not shutdown the broker due to there being no remaining online directories. We did agree the broker should fail if a directory fails during migration, apologies, I wasn't planning on making that change as part of this PR but I can look into detecting the migration case and addressing that here too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
pprovenzano commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1385399770 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +/** + *
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
pprovenzano commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1385385403 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig, if (sendZkNotification) if (zkClient.isEmpty) { -warn("Unable to propagate log dir failure via Zookeeper in KRaft mode") +val uuid = logManager.directoryId(dir) +if (uuid.isDefined && lifecycleManager.isDefined) + lifecycleManager.get.propagateDirectoryFailure(uuid.get) Review Comment: When would we ever get into this state. If this can only occur during migration, I thought we agreed that we would fail the broker if a directory failed during migration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1795077968 @cmccabe @pprovenzano this one also builds on [KAFKA-15451](https://github.com/apache/kafka/pull/14368) (#14368). Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org