[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-15 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r670654509



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+private final boolean unclean;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+}
+}
+for (int replica : registration.replicas) {
+if (!adding.contains(replica)) {
+this.replicas.add(replica);
+}
+}
+if (isr.isEmpty()) {
+// In the special case that all the replicas that are in the ISR 
are also
+// contained in addingReplicas, we choose the first remaining 
replica and add
+// it to the ISR. This is considered an unclean leader election. 
Therefore,
+// calling code must check that unclean leader election is enabled 
before
+// accepting the new ISR.
+if (this.replicas.isEmpty()) {
+// This should not be reachable, since it would require a 
partition
+// starting with an empty replica set prior to the 
reassignment we are
+// trying to revert.
+throw new InvalidReplicaAssignmentException("Invalid replica " 
+
+"assignment: addingReplicas contains all replicas.");
+}
+isr.add(replicas.get(0));

Review comment:
   So the intention is that if the revert is "unclean" and we don't have 
unclean leader election enabled, the revert operation for the partition returns 
an error rather than succeeding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r670134315



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+private final boolean unclean;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+}
+}
+for (int replica : registration.replicas) {
+if (!adding.contains(replica)) {
+this.replicas.add(replica);
+}
+}
+if (isr.isEmpty()) {
+// In the special case that all the replicas that are in the ISR 
are also
+// contained in addingReplicas, we choose the first remaining 
replica and add
+// it to the ISR. This is considered an unclean leader election. 
Therefore,
+// calling code must check that unclean leader election is enabled 
before
+// accepting the new ISR.
+if (this.replicas.isEmpty()) {
+// This should not be reachable, since it would require a 
partition
+// starting with an empty replica set prior to the 
reassignment we are
+// trying to revert.
+throw new InvalidReplicaAssignmentException("Invalid replica " 
+
+"assignment: addingReplicas contains all replicas.");
+}
+isr.add(replicas.get(0));

Review comment:
   It seems simpler to do it here because then we can reject the unclean 
case before creating the `PartitionChangeBuilder`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r670132470



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionChangeBuilder handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {
+this.alwaysElectPreferredIfPossible = alwaysElectPreferredIfPossible;
+return this;
+}
+
+public PartitionChangeBuilder setTargetRemoving(List 
targetRemoving) {
+this.targetRemoving = targetRemoving;
+return this;
+}
+
+public PartitionChangeBuilder setTargetAdding(List targetAdding) {
+this.targetAdding = targetAdding;
+return this;
+}
+
+boolean shouldTryElection() {
+// If the new isr doesn't have the current leader, we need to try to 
elect a new
+// one. Note: this also handles the case where the current leader is 
NO_LEADER,
+// since that value cannot appear in targetIsr.
+if (!targetIsr.contains(partition.leader)) return true;
+
+// Check if we want to try to get away from a non-preferred leader.
+if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+
+return false;
+}
+
+class BestLeader {
+final int node;
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669944074



##
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##
@@ -422,4 +351,75 @@ class RaftClusterTest {
   listenerName = listenerName
 )
 
+  @Test
+  def testCreateClusterAndPerformReassignment(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(4).
+setNumControllerNodes(3).build()).build()
+try {
+  cluster.format()
+  cluster.startup()
+  cluster.waitForReadyBrokers()
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+// Create the topic.
+val assignments = new util.HashMap[Integer, util.List[Integer]]
+assignments.put(0, Arrays.asList(0, 1, 2))
+assignments.put(1, Arrays.asList(1, 2, 3))
+assignments.put(2, Arrays.asList(2, 3, 0))
+val createTopicResult = admin.createTopics(Collections.singletonList(
+  new NewTopic("foo", assignments)))
+createTopicResult.all().get()
+waitForTopicListing(admin, Seq("foo"), Seq())
+
+// Start some reassignments.
+assertEquals(Collections.emptyMap(), 
admin.listPartitionReassignments().reassignments().get())
+val reassignments = new util.HashMap[TopicPartition, 
Optional[NewPartitionReassignment]]
+reassignments.put(new TopicPartition("foo", 0),
+  Optional.of(new NewPartitionReassignment(Arrays.asList(2, 1, 0
+reassignments.put(new TopicPartition("foo", 1),
+  Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1, 2

Review comment:
   Good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669926069



##
File path: 
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
##
@@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() {
 setIsNew(false).toString(),
 b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), 
false).toString());
 }
+
+@Test
+public void testMergePartitionChangeRecordWithReassignmentData() {
+PartitionRegistration partition0 = new PartitionRegistration(new int[] 
{1, 2, 3},
+new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200);
+PartitionRegistration partition1 = partition0.merge(new 
PartitionChangeRecord().
+setRemovingReplicas(Collections.singletonList(3)).
+setAddingReplicas(Collections.singletonList(4)).
+setReplicas(Arrays.asList(1, 2, 3, 4)));
+assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4},
+new int[] {1, 2, 3}, new int[] {3}, new int[] {4}, 1, 100, 201), 
partition1);
+PartitionRegistration partition2 = partition1.merge(new 
PartitionChangeRecord().
+setIsr(Arrays.asList(1, 2, 4)).
+setRemovingReplicas(Collections.emptyList()).
+setAddingReplicas(Collections.emptyList()).
+setReplicas(Arrays.asList(1, 2, 4)));
+assertEquals(new PartitionRegistration(new int[] {1, 2, 4},
+new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, 100, 202), 
partition2);
+assertFalse(partition2.isReassigning());
+}
+
+@Test
+public void testPartitionControlInfoIsrChangeCompletesReassignment() {
+PartitionRegistration partition0 = new PartitionRegistration(
+new int[]{1, 2, 3, 4}, new int[]{3}, new int[]{3}, new int[] {}, 
1, 0, 0);

Review comment:
   Good idea. But looking again, we can remove this test because the 
function has been shifted out of PartitionRegistration




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669925103



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static 
org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class PartitionChangeBuilderTest {
+@Test
+public void testChangeRecordIsNoOp() {
+assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
+assertFalse(changeRecordIsNoOp(new 
PartitionChangeRecord().setLeader(1)));
+assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+setIsr(Arrays.asList(1, 2, 3;
+assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+setRemovingReplicas(Arrays.asList(1;
+assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+setAddingReplicas(Arrays.asList(4;
+}
+
+private final static PartitionRegistration FOO = new PartitionRegistration(
+new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
+1, 100, 200);
+
+private final static Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+private static PartitionChangeBuilder createFooBuilder(boolean 
allowUnclean) {
+return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> 
allowUnclean);
+}
+
+private final static PartitionRegistration BAR = new PartitionRegistration(
+new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] 
{4},
+1, 100, 200);
+
+private final static Uuid BAR_ID = 
Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
+
+private static PartitionChangeBuilder createBarBuilder(boolean 
allowUnclean) {
+return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> 
allowUnclean);
+}
+
+private static void assertBestLeaderEquals(PartitionChangeBuilder builder,
+   int expectedNode,
+   boolean expectedUnclean) {
+BestLeader bestLeader = builder.new BestLeader();
+assertEquals(expectedNode, bestLeader.node);
+assertEquals(expectedUnclean, bestLeader.unclean);
+}
+
+@Test
+public void testBestLeader() {
+assertBestLeaderEquals(createFooBuilder(false), 2, false);
+assertBestLeaderEquals(createFooBuilder(true), 2, false);
+assertBestLeaderEquals(createFooBuilder(false).
+setTargetIsr(Arrays.asList(1, 3)), 1, false);
+assertBestLeaderEquals(createFooBuilder(true).
+setTargetIsr(Arrays.asList(1, 3)), 1, false);
+assertBestLeaderEquals(createFooBuilder(false).
+setTargetIsr(Arrays.asList(3)), NO_LEADER, false);
+assertBestLeaderEquals(createFooBuilder(true).
+setTargetIsr(Arrays.asList(3)), 2, true);
+assertBestLeaderEquals(createFooBuilder(true).
+
setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+4, false);
+}
+
+@Test
+public void testShouldTryElection() {
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669911056



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
##
@@ -103,7 +103,8 @@ public TopicImage apply() {
 for (Entry entry : 
partitionChanges.entrySet()) {
 if (entry.getValue().leader == brokerId) {
 PartitionRegistration prevPartition = 
image.partitions().get(entry.getKey());
-if (prevPartition == null || prevPartition.leader != brokerId) 
{
+if (prevPartition == null ||
+prevPartition.leaderEpoch != 
entry.getValue().leaderEpoch) {

Review comment:
   Good point. Thinking about it more, we can just check for partition 
epoch here, since LE can't change without PE changing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669896829



##
File path: 
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
##
@@ -180,6 +187,44 @@ public LeaderAndIsrPartitionState 
toLeaderAndIsrPartitionState(TopicPartition tp
 setIsNew(isNew);
 }
 
+/**
+ * Returns true if this partition is reassigning.
+ */
+public boolean isReassigning() {
+return removingReplicas.length > 0 | addingReplicas.length > 0;
+}
+
+/**
+ * Check if an ISR change completes this partition's reassignment.
+ *
+ * @param newIsrThe new ISR.
+ * @return  True if the reassignment is complete.
+ */
+public boolean isrChangeCompletesReassignment(int[] newIsr) {

Review comment:
   Yes, this is unused now. I'll remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669891898



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+} else if (i == registration.isr.length - 1 && isr.isEmpty()) {
+// This is a special case where taking out all the "adding" 
replicas is
+// not possible. The reason it is not possible is that doing 
so would
+// create an empty ISR, which is not allowed.
+//
+// In this case, we leave in one of the adding replicas 
permanently.

Review comment:
   I guess we can just not allow the reassignment to be cancelled in this 
situation. It should still be possible to undo the reassignment by creating a 
new reassignment, so it doesn't totally trap the user, I suppose...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669215608



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   I also added a unit test for `electLeaders`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669215608



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   I also added a unit test for `electLeaders`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669203100



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+} else if (i == registration.isr.length - 1 && isr.isEmpty()) {
+// This is a special case where taking out all the "adding" 
replicas is
+// not possible. The reason it is not possible is that doing 
so would
+// create an empty ISR, which is not allowed.
+//
+// In this case, we leave in one of the adding replicas 
permanently.

Review comment:
   It's not possible to revert to the original replicas in all cases, 
though. Specifically, if none of the original replicas are in the ISR, you 
can't do such a revert without data loss. If the old controller is doing that 
unconditionally, it seems like a bug.
   
   I suppose we could also fail the revert operation, but that doesn't seem 
like it would be a useful behavior from the user's point of view




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669201808



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   Good catch. This is supposed to be set when we're handling the 
`electLeaders` RPC, since in that case you want to move to the preferred leader 
if it's not on that leader already. Should be fixed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-12 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667727196



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
##
@@ -90,6 +97,13 @@ public Integer value() {
 return preferredReplicaImbalanceCount;
 }
 });
+this.reassigningPartitionCountGauge = omitBrokerMetrics ? null :

Review comment:
   I took this out. (We'll do metrics later...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-12 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667726749



##
File path: 
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
##
@@ -29,6 +29,15 @@
   "about": "null if the ISR didn't change; the new in-sync replicas 
otherwise." },
 { "name": "Leader", "type": "int32", "default": "-2", "entityType": 
"brokerId",
   "versions": "0+", "taggedVersions": "0+", "tag": 1,
-  "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." }
+  "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." },
+{ "name": "Replicas", "type": "[]int32", "default": "null", "entityType": 
"brokerId",
+  "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 2,
+  "about": "null if the replicas didn't change; the new replicas 
otherwise." },
+{ "name": "RemovingReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",

Review comment:
   null means that the PartitionChangeRecord didn't change it. That 
possibility doesn't exist in PartitionRecord, which must spell out the full 
values of all fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-12 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667727058



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -,19 +,19 @@ private QuorumController(LogContext logContext,
 }
 return appendWriteEvent("alterPartitionReassignments",
 time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
-() -> {
-throw new UnsupportedOperationException();
-});
+() -> replicationControl.alterPartitionReassignments(request));
 }
 
 @Override
 public CompletableFuture
 listPartitionReassignments(ListPartitionReassignmentsRequestData 
request) {
+if (request.topics() != null && request.topics().isEmpty()) {
+return CompletableFuture.completedFuture(
+new 
ListPartitionReassignmentsResponseData().setErrorMessage(null));

Review comment:
   Unfortunately, the default is not null. I could change the default, but 
I'd have to change a bunch of other code, so I left it as-is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-12 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667726466



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-12 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667726212



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -711,56 +830,73 @@ BrokersToIsrs brokersToIsrs() {
 }
 TopicControlInfo topic = topics.get(topicId);
 for (AlterIsrRequestData.PartitionData partitionData : 
topicData.partitions()) {
-PartitionControlInfo partition = 
topic.parts.get(partitionData.partitionIndex());
+int partitionId = partitionData.partitionIndex();
+PartitionControlInfo partition = topic.parts.get(partitionId);
 if (partition == null) {
 responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-setPartitionIndex(partitionData.partitionIndex()).
+setPartitionIndex(partitionId).
 setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
 continue;
 }
 if (request.brokerId() != partition.leader) {
 responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-setPartitionIndex(partitionData.partitionIndex()).
+setPartitionIndex(partitionId).
 setErrorCode(INVALID_REQUEST.code()));
 continue;
 }
 if (partitionData.leaderEpoch() != partition.leaderEpoch) {
 responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-setPartitionIndex(partitionData.partitionIndex()).
-setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
+setPartitionIndex(partitionId).
+setErrorCode(FENCED_LEADER_EPOCH.code()));
 continue;
 }
 if (partitionData.currentIsrVersion() != 
partition.partitionEpoch) {
 responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-setPartitionIndex(partitionData.partitionIndex()).
+setPartitionIndex(partitionId).
 setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
 continue;
 }
 int[] newIsr = Replicas.toArray(partitionData.newIsr());
 if (!Replicas.validateIsr(partition.replicas, newIsr)) {
 responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-setPartitionIndex(partitionData.partitionIndex()).
+setPartitionIndex(partitionId).
 setErrorCode(INVALID_REQUEST.code()));
 continue;
 }
 if (!Replicas.contains(newIsr, partition.leader)) {
 // An alterIsr request can't remove the current leader.
 responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-setPartitionIndex(partitionData.partitionIndex()).
+setPartitionIndex(partitionId).
 setErrorCode(INVALID_REQUEST.code()));
 continue;
 }
-records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
-setPartitionId(partitionData.partitionIndex()).
-setTopicId(topic.id).
-setIsr(partitionData.newIsr()), (short) 0));
-responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-setPartitionIndex(partitionData.partitionIndex()).
-setErrorCode(Errors.NONE.code()).
-setLeaderId(partition.leader).
-setLeaderEpoch(partition.leaderEpoch).
-setCurrentIsrVersion(partition.partitionEpoch + 1).
-setIsr(partitionData.newIsr()));
+PartitionChangeRecord record = 
generateLeaderAndIsrUpdate(topic,

Review comment:
   Yes, let's bump up the leader epoch here. Should be fixed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-12 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667725993



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-12 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667238459



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-10 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667401789



##
File path: 
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
##
@@ -29,6 +29,15 @@
   "about": "null if the ISR didn't change; the new in-sync replicas 
otherwise." },
 { "name": "Leader", "type": "int32", "default": "-2", "entityType": 
"brokerId",
   "versions": "0+", "taggedVersions": "0+", "tag": 1,
-  "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." }
+  "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." },
+{ "name": "Replicas", "type": "[]int32", "default": "null", "entityType": 
"brokerId",
+  "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 2,

Review comment:
   We have decided to reset versions to 0 since we dropped compatibility 
with 2.8. We bumped up the frame version so that the incompatibility can be 
detected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-09 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667240160



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-09 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667238459



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-09 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667232772



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  The merged replica list.
+ */
+List calculateMergedReplicas(List currentReplicas,

Review comment:
   I removed this class -- see above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-09 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665801100



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  The merged replica list.
+ */
+List calculateMergedReplicas(List currentReplicas,

Review comment:
   The issue is that we don't always have current and target. For example, 
if we're cancelling a reassignment, we just have current.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-09 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667232604



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.

Review comment:
   In the latest change, I have split this class into 
`PartitionReassignmentReplicas` and `PartitionReassignmentRevert`,  which are 
both a lot simpler than `RemovingAndAddingReplicas` was.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-09 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667204243



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-08 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665799816



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.

Review comment:
   I will take a look in a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-07 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665801699



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -343,12 +409,21 @@ public String toString() {
 this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
 this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
 this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
+this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
+this.reassigningPartitionCount = new TimelineInteger(snapshotRegistry);
 }
 
 public void replay(TopicRecord record) {
-topicsByName.put(record.name(), record.topicId());
-topics.put(record.topicId(),
+Uuid prevTopicId = topicsByName.get(record.name());
+if (prevTopicId != null) {
+replay(new RemoveTopicRecord().setTopicId(prevTopicId));
+}
+TopicControlInfo prevTopic = topics.put(record.topicId(),
 new TopicControlInfo(record.name(), snapshotRegistry, 
record.topicId()));
+if (prevTopic != null) {

Review comment:
   I'm going to take this part out for now to focus on just the 
reassignment stuff

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+ 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-07 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665801598



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -235,6 +256,41 @@ int preferredReplica() {
 return replicas.length == 0 ? NO_LEADER : replicas[0];
 }
 
+boolean isReassigning() {
+return removingReplicas.length > 0 || addingReplicas.length > 0;
+}
+
+/**
+ * Check if an ISR change completes this partition's reassignment.
+ *
+ * @param newIsrThe new ISR.
+ * @return  True if the reassignment is complete.
+ */
+boolean isrChangeCompletesReassignment(int[] newIsr) {

Review comment:
   We haven't implemented auto leader balancing yet, so we're OK here...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-07 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665801448



##
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##
@@ -27,9 +27,9 @@
   "about": "The replicas of this partition, sorted by preferred order." },
 { "name": "Isr", "type":  "[]int32", "versions":  "0+",
   "about": "The in-sync replicas of this partition" },
-{ "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", 
"nullableVersions": "0+", "entityType": "brokerId",
+{ "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", 
"entityType": "brokerId",

Review comment:
   This should be addressed by the recent changes in trunk




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-07 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665801100



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  The merged replica list.
+ */
+List calculateMergedReplicas(List currentReplicas,

Review comment:
   The issue is that we don't always have current and target. For example, 
if we're cancelling a reassignment, we just have current.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-07 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665799816



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.

Review comment:
   I think this can be simplified. I will take a look in a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-07 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r665798854



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
"