Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-18 Thread via GitHub


ijuma commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1817732550

   @cmccabe Please check all the Java/Scala versions before merging, this PR 
clearly broke the Java 8/Scala 2.12 build in multiple ways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-17 Thread via GitHub


dajac commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1816721578

   I've also seen `Build / JDK 8 and Scala 2.12 / testAssignmentAggregation() – 
kafka.server.AssignmentsManagerTest` failing consistently in a few builds 
([example](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2392/tests)).
 @soarez Could you please double check this test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-16 Thread via GitHub


dajac commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1815873092

   It seems that a compilation error was introduced by this PR. From the last 
"JDK 8 and Scala 2.12" build:
   ```
   [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-14369/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala:157:36:
 type mismatch;
   
found   : Seq[String] (in scala.collection) 
   
required: Seq[String] (in scala.collection.immutable) 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-16 Thread via GitHub


cmccabe commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1815535748

   commited, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-16 Thread via GitHub


cmccabe closed pull request #14369: KAFKA-15357: Aggregate and propagate 
assignments
URL: https://github.com/apache/kafka/pull/14369


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-16 Thread via GitHub


cmccabe commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1815518652

   > @cmccabe I'm confused, I thought that was the whole point of 
kafka.utils.Exit 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L21-L24
 
https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
   > 
   > The FaultHandler concept in QuorumController seems great, but I'm not sure 
how it fits here: afaict the fault is always fatal and we can intercept it in 
tests - what am I missing?
   
   Exit has two very big flaws:
   
   1. it uses static variables, so if you run tests in parallel, you can hit 
issues.
   2. you have to remember to set it. most tests leave Exit's default behavior 
intact (which is exiting the process)
   
   Anyway we can talk more about this later. We don't have to convert to using 
it in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-16 Thread via GitHub


soarez commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1814927398

   @cmccabe @pprovenzano PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


soarez commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1810719729

   > avoids calling exit(1) in junit tests, which will kill Jenkins dead (even 
after 3 decades of Java, we don't have the technology to intercept exit() in 
unit testrs >:( )
   
   @cmccabe I'm confused, I thought that was the whole point of 
`kafka.utils.Exit`
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L21-L24
   
https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
   
   The `FaultHandler` concept in `QuorumController` seems great, but I'm not 
sure how it fits here: afaict the fault is always fatal and we can intercept it 
in tests - what am I missing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


OmniaGM commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1392653351


##
core/src/test/java/kafka/server/AssignmentsManagerTest.java:
##
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class AssignmentsManagerTest {
+
+private static final Uuid TOPIC_1 = 
Uuid.fromString("88rnFIqYSZykX4ZSKv81bg");
+private static final Uuid TOPIC_2 = 
Uuid.fromString("VKCnzHdhR5uDQc1shqBYrQ");
+private static final Uuid DIR_1 = 
Uuid.fromString("cbgD8WdLQCyzLrFIMBhv3w");
+private static final Uuid DIR_2 = 
Uuid.fromString("zO0bDc0vSuam7Db9iH7rYQ");
+private static final Uuid DIR_3 = 
Uuid.fromString("CGBWbrFkRkeJQy6Aryzq2Q");
+
+private MockTime time;
+private NodeToControllerChannelManager channelManager;
+private AssignmentsManager manager;
+
+@BeforeEach
+public void setup() {
+time = new MockTime();
+channelManager = mock(NodeToControllerChannelManager.class);
+manager = new AssignmentsManager(time, channelManager, 8, () -> 100L);
+}
+
+@AfterEach
+void tearDown() throws InterruptedException {
+manager.close();
+}
+
+@Test
+void testBuildRequestData() {
+Map assignment = new HashMap() {{
+put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+}};
+AssignReplicasToDirsRequestData built = 
AssignmentsManager.buildRequestData(8, 100L, assignment);
+AssignReplicasToDirsRequestData expected = new 
AssignReplicasToDirsRequestData()
+.setBrokerId(8)
+.setBrokerEpoch(100L)
+.setDirectories(Arrays.asList(
+new AssignReplicasToDirsRequestData.DirectoryData()
+.setId(DIR_2)
+.setTopics(Arrays.asList(
+new 
AssignReplicasToDirsRequestData.TopicData()
+.setTopicId(TOPIC_1)
+
.setPartitions(Collections.singletonList(
+new 
AssignReplicasToDirsRequestData.PartitionData()
+
.setPartitionIndex(2)
+)),
+new 
AssignReplicasToDirsRequestData.TopicData()
+.setTopicId(TOPIC_2)
+
.setPartitions(Coll

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1392636901


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);

Review Comment:
   Maybe this syntax looks a bit deceiving. `TimeUnit.SECONDS.toNanos(1)` 
returns `10`. This is 1 second, not 1 nano. Does that seem reasonable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


OmniaGM commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1810263837

   I agree with @cmccabe regarding passing a reference for `AssignmentHandler` 
to `ReplicaManager`. 
   Maybe one other note, many integration tests are failing now with the 
following error 
   ```
   Error applying topics delta in MetadataDelta up to 78: Assignment into 
unidentified directory: 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14369/tools/data/kafka-2762391009500302231
at 
app//kafka.server.ReplicaManager.maybeNotifyPartitionAssignedToDirectory(ReplicaManager.scala:2362)
   ``` 
   for example 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14369/11/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testElectUncleanLeadersNoop_String__quorum_kraft/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1392624170


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);

Review Comment:
   This isn

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1392624170


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);

Review Comment:
   This isn

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-14 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1392487870


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391783416


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -277,6 +279,24 @@ class BrokerServer(
 time
   )
 
+  if (config.logDirs.size > 1) {

Review Comment:
   Should we log something here, to indicate that JBOD mode is on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391782832


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/*

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391781726


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);

Review Comment:
   Throwin

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391780510


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);

Review Comment:
   Also a lot of CPU clocks don't go down to an individual nanosecond. I don't 
know the exact details (varies a lot by platform) but basically you probably 
can't schedule an actual 1 ns delay.
   
   Something like a 1/4 a ms would be more reasonable? Though still likely to 
be padded out to a longer length by the actual platform



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391779074


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);

Review Comment:
   I don't think 1 nanosecond is going to give you much batching!
   
   At 4 GHz, 1 nanosecond is like 4 CPU cycles
   
   Obviously you have piplining, instruction reordering, yadda yadda, but you 
get the idea: this is NOT a reasonable amount of time to get anything done in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809242988

   I don't know if we want to do this in this PR, but one thing I would suggest 
for ReplicaManager is to use the `FaultHandler` paradigm we have in the 
`QuorumController` code.
   
   Specifically, when `QuorumController` hits an unrecoverable condition it 
will invoke the `FaultHandler`. In normal operation this maps to logging an 
error message and exiting. In unit tests, it maps to an exception, and also 
setting a flag that will cause any integration test to always fail.
   
   This accomplishes two things:
   1. avoids calling `exit(1)` in junit tests, which will kill Jenkins dead 
(even after 3 decades of Java, we don't have the technology to intercept 
`exit()` in unit testrs >:( )
   2. allows us to always know if something is going wrong in the unit / 
integration test.
   
   There can also be non-fatal fault handlers, which tend to make point 2 even 
more important (since many times throwing an exception or logging an ERROR will 
not prevent the test from succeeding!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809239029

   I don't have a good place to put this comment (github only lets me comment 
on changed lines) but there is a problem with this code:
   
   ```
 private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: 
Boolean) extends ShutdownableThread(name) {
   override def doWork(): Unit = {
 val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
 if (haltBrokerOnDirFailure) {
   fatal(s"Halting broker because dir $newOfflineLogDir is offline")
   Exit.halt(1)
 }
 handleLogDirFailure(newOfflineLogDir)
   }
 }
   ```
   
   If the directory that failed is the metadata directory, we need to exit 
unconditionally. This is because we have not implemented any way of failing 
over to a different directory for metadata.
   
   I suppose we should have a post-3.7 follow-up JIRA for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391771537


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2323,15 +2326,56 @@ class ReplicaManager(val config: KafkaConfig,
 }
 logManager.handleLogDirFailure(dir)
 
-if (sendZkNotification)
+if (notifyController) {
+  if (config.migrationEnabled) {
+fatal(s"Shutdown broker because some log directory has failed during 
migration mode: $dir")
+Exit.halt(1)

Review Comment:
   This seems wrong as our long-term solution, but I guess it's OK for now. (We 
can discuss more later I guess)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391770284


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -269,7 +270,9 @@ class ReplicaManager(val config: KafkaConfig,
  delayedRemoteFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
  threadNamePrefix: Option[String] = None,
  val brokerEpochSupplier: () => Long = () => -1,
- addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None
+ addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None,
+ assignmentManager: Option[AssignmentsManager] = None,

Review Comment:
   I know this is a common pattern in `ReplicaManager` but it just seems so 
bad. We don't really care about the details of `AssignmentsManager` or 
`BrokerLifecycleManager` here. Shouldn't we just be passing a reference to an 
interface like `AssignmentHandler`?
   
   ```
   interface AssignmentHandler {
 void onAssignment(TopicIdPartition, Uuid);
 void propagateDirectoryFailure(Uuid directoryId);
   }
   ```
   
   etc.
   
   Then we can initialize a dummy version by default, to keep all the unit 
tests working without changes (if desired).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391763417


##
clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java:
##
@@ -27,6 +27,8 @@
 
 public class AssignReplicasToDirsRequest extends AbstractRequest {
 
+public static final int MAX_ASSIGNMENTS_PER_REQUEST = 2250;

Review Comment:
   Can you add JavaDoc about this? Including the part where we want to keep it 
under 64kb.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


pprovenzano commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1808630427

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-10 Thread via GitHub


soarez commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1805892764

   Is there a way to trigger the tests again or do I need to push additional 
changes? All the test pipelines show a Jenkins error: "No space left on device".
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-10 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1389229875


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Hand

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-08 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Hand

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-08 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Hand

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-08 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Hand

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-08 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1386810622


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig,
 
 if (sendZkNotification)
   if (zkClient.isEmpty) {
-warn("Unable to propagate log dir failure via Zookeeper in KRaft mode")
+val uuid = logManager.directoryId(dir)
+if (uuid.isDefined && lifecycleManager.isDefined)
+  lifecycleManager.get.propagateDirectoryFailure(uuid.get)

Review Comment:
   Ah! I think that variable should get a new name. The "Zk" part of it conveys 
that instead of an RPC to the controller,  the notification goes via ZK, but 
what the flag actually does is determine whether the controller should know 
about the failure or not – regardless of what notification mechanism is used. 
I'll update this PR to rename that variable.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig,
 
 if (sendZkNotification)
   if (zkClient.isEmpty) {
-warn("Unable to propagate log dir failure via Zookeeper in KRaft mode")
+val uuid = logManager.directoryId(dir)
+if (uuid.isDefined && lifecycleManager.isDefined)
+  lifecycleManager.get.propagateDirectoryFailure(uuid.get)

Review Comment:
   Ah! I think that variable should get a new name. The "Zk" part of it conveys 
that instead of an RPC to the controller,  the notification goes via ZK, but 
what the flag actually does is determine whether the controller should know 
about the failure or not – regardless of what notification mechanism is used. 
I'll update this PR to rename that variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-08 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1386805266


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Hand

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-08 Thread via GitHub


pprovenzano commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1386788917


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig,
 
 if (sendZkNotification)
   if (zkClient.isEmpty) {
-warn("Unable to propagate log dir failure via Zookeeper in KRaft mode")
+val uuid = logManager.directoryId(dir)
+if (uuid.isDefined && lifecycleManager.isDefined)
+  lifecycleManager.get.propagateDirectoryFailure(uuid.get)

Review Comment:
   Why would sendZkNotification be enabled in normal KRaft mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-08 Thread via GitHub


soarez commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1386609951


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig,
 
 if (sendZkNotification)
   if (zkClient.isEmpty) {
-warn("Unable to propagate log dir failure via Zookeeper in KRaft mode")
+val uuid = logManager.directoryId(dir)
+if (uuid.isDefined && lifecycleManager.isDefined)
+  lifecycleManager.get.propagateDirectoryFailure(uuid.get)

Review Comment:
   I think I'm missing something, why can this only happen during migration? 
AFAICT this will be called in normal KRaft operation, as long as the line 
`logManager.handleLogDirFailure(dir)` above does not shutdown the broker due to 
there being no remaining online directories.

   We did agree the broker should fail if a directory fails during migration, 
apologies, I wasn't planning on making that change as part of this PR but I can 
look into detecting the migration case and addressing that here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-07 Thread via GitHub


pprovenzano commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1385399770


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+/**
+ *

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-07 Thread via GitHub


pprovenzano commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1385385403


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2296,13 +2298,41 @@ class ReplicaManager(val config: KafkaConfig,
 
 if (sendZkNotification)
   if (zkClient.isEmpty) {
-warn("Unable to propagate log dir failure via Zookeeper in KRaft mode")
+val uuid = logManager.directoryId(dir)
+if (uuid.isDefined && lifecycleManager.isDefined)
+  lifecycleManager.get.propagateDirectoryFailure(uuid.get)

Review Comment:
   When would we ever get into this state. If this can only occur during 
migration, I thought we agreed that we would fail the broker if a directory 
failed during migration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-06 Thread via GitHub


soarez commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1795077968

   @cmccabe @pprovenzano this one also builds on 
[KAFKA-15451](https://github.com/apache/kafka/pull/14368) (#14368). Please take 
a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org