[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-06-03 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r644352228



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.

Review comment:
   The logic seems a bit complicated and I am not sure how useful it is. 
For example, if current and target are (1, 3, 4) and (1, 2, 4), this method 
returns (1, 3, 2, 4). The old code calculates merged replicas simply as (target 
++ originReplicas).distinct. So, it would be (1,2,4,3). Any benefit of the new 
way of merging replicas?
   

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
  

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-06-04 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r644352228



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+private final Set removing;
+private final Set adding;
+
+RemovingAndAddingReplicas(int[] removing, int[] adding) {
+this.removing = new HashSet<>();
+for (int replica : removing) {
+this.removing.add(replica);
+}
+this.adding = new HashSet<>();
+for (int replica : adding) {
+this.adding.add(replica);
+}
+}
+
+RemovingAndAddingReplicas(List removing, List adding) {
+this.removing = new HashSet<>(removing);
+this.adding = new HashSet<>(adding);
+}
+
+/**
+ * Calculate what replicas need to be added and removed to reach a 
specific target
+ * replica list.
+ *
+ * @param currentReplicas   The current replica list.
+ * @param targetReplicasThe target replica list.
+ *
+ * @return  An object containing the removing and adding 
replicas.
+ */
+static RemovingAndAddingReplicas forTarget(List currentReplicas,
+   List targetReplicas) {
+List removingReplicas = new ArrayList<>();
+List addingReplicas = new ArrayList<>();
+List sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+sortedCurrentReplicas.sort(Integer::compareTo);
+List sortedTargetReplicas = new ArrayList<>(targetReplicas);
+sortedTargetReplicas.sort(Integer::compareTo);
+int i = 0, j = 0;
+while (true) {
+if (i < sortedCurrentReplicas.size()) {
+int currentReplica = sortedCurrentReplicas.get(i);
+if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+if (currentReplica < targetReplica) {
+removingReplicas.add(currentReplica);
+i++;
+} else if (currentReplica > targetReplica) {
+addingReplicas.add(targetReplica);
+j++;
+} else {
+i++;
+j++;
+}
+} else {
+removingReplicas.add(currentReplica);
+i++;
+}
+} else if (j < sortedTargetReplicas.size()) {
+int targetReplica = sortedTargetReplicas.get(j);
+addingReplicas.add(targetReplica);
+j++;
+} else {
+break;
+}
+}
+return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+}
+
+/**
+ * Calculate the merged replica list following a reassignment.
+ *
+ * The merged list will contain all of the target replicas, in the order 
they appear
+ * in the target list.  It will also contain existing replicas that are 
scheduled to
+ * be removed.
+ *
+ * If a removing replica was in position X in the original replica list, 
it will
+ * appear in the merged list following the appearance of X non-new 
replicas.

Review comment:
   The logic seems a bit complicated and I am not sure how useful it is. 
For example, if current and target are (1, 3, 4) and (1, 2, 4), this method 
returns (1, 3, 2, 4). The old code calculates merged replicas simply as (target 
++ originReplicas).distinct. So, it would be (1,2,4,3). Any benefit of the new 
way of merging replicas?
   

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
  

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-08 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r666339593



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-09 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667223580



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
 }
 }
 
+PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo 
partition,
+ int[] newIsr,
+ Function isAcceptableLeader) {
+PartitionChangeRecord record = new PartitionChangeRecord().
+setTopicId(topic.id).
+setPartitionId(partitionId);
+int[] newReplicas = partition.replicas;
+if (partition.isrChangeCompletesReassignment(newIsr)) {
+if (partition.addingReplicas.length > 0) {
+record.setAddingReplicas(Collections.emptyList());
+}
+if (partition.removingReplicas.length > 0) {
+record.setRemovingReplicas(Collections.emptyList());
+newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+}
+}
+int newLeader;
+if (Replicas.contains(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+}
+if (!electionWasClean(newLeader, newIsr)) {
+// After an unclean leader election, the ISR is reset to just the 
new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+if (newLeader != partition.leader) record.setLeader(newLeader);
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (!Arrays.equals(newReplicas, partition.replicas)) {
+record.setReplicas(Replicas.toList(newReplicas));
+}
+return record;
+}
+
+ControllerResult
+alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+List records = new ArrayList<>();
+AlterPartitionReassignmentsResponseData result =
+new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+int successfulAlterations = 0, totalAlterations = 0;
+for (ReassignableTopic topic : request.topics()) {
+ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+setName(topic.name());
+for (ReassignablePartition partition : topic.partitions()) {
+ApiError error = ApiError.NONE;
+try {
+alterPartitionReassignment(topic.name(), partition, 
records);
+successfulAlterations++;
+} catch (Throwable e) {
+log.info("Unable to alter partition reassignment for " +
+topic.name() + ":" + partition.partitionIndex() + " 
because " +
+"of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+error = ApiError.fromThrowable(e);
+}
+totalAlterations++;
+topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+setPartitionIndex(partition.partitionIndex()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+result.responses().add(topicResponse);
+}
+log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+successfulAlterations, totalAlterations);
+return ControllerResult.atomicOf(records, result);
+}
+
+void alterPartitionReassignment(String topicName,
+ReassignablePartition partition,
+List records) {
+Uuid topicId = topicsByName.get(topicName);
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+"named " + topicName + ".");
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r668323818



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   Do we need this since it's only used in tests?

##
File path: 
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
##
@@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() {
 setIsNew(false).toString(),
 b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), 
false).toString());
 }
+
+@Test
+public void testMergePartitionChangeRecordWithReassignmentData() {
+PartitionRegistration partition0 = new PartitionRegistration(new int[] 
{1, 2, 3},
+new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200);
+PartitionRegistration partition1 = partition0.merge(new 
PartitionChangeRecord().
+setRemovingReplicas(Collections.singletonList(3)).
+setAddingReplicas(Collections.singletonList(4)).
+setReplicas(Arrays.asList(1, 2, 3, 4)));
+assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4},
+new int[] {

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669155613



##
File path: metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java
##
@@ -88,9 +90,34 @@ public void testCopyWithout() {
 assertArrayEquals(new int[] {4, 1}, Replicas.copyWithout(new int[] {4, 
2, 2, 1}, 2));
 }
 
+@Test
+public void testCopyWithout2() {
+assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {}, new 
int[] {}));
+assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {1}, 
new int[] {1}));
+assertArrayEquals(new int[] {1, 3},
+Replicas.copyWithout(new int[] {1, 2, 3}, new int[]{2, 4}));
+assertArrayEquals(new int[] {4},
+Replicas.copyWithout(new int[] {4, 2, 2, 1}, new int[]{2, 1}));
+}
+
 @Test
 public void testCopyWith() {
 assertArrayEquals(new int[] {-1}, Replicas.copyWith(new int[] {}, -1));
 assertArrayEquals(new int[] {1, 2, 3, 4}, Replicas.copyWith(new int[] 
{1, 2, 3}, 4));
 }
+
+@Test
+public void testToSet() {
+assertEquals(Collections.emptySet(), Replicas.toSet(new int[] {}));
+assertEquals(new HashSet<>(Arrays.asList(3, 1, 5)),

Review comment:
   Could we add a test where the input has duplicates?

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##
@@ -763,17 +747,227 @@ public void testValidateBadManualPartitionAssignments() 
throws Exception {
 OptionalInt.of(3))).getMessage());
 }
 
+private final static ListPartitionReassignmentsResponseData 
NONE_REASSIGNING =
+new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+
 @Test
-public void testBestLeader() {
-assertEquals(2, ReplicationControlManager.bestLeader(
-new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
-assertEquals(3, ReplicationControlManager.bestLeader(
-new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
-assertEquals(4, ReplicationControlManager.bestLeader(
-new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
-assertEquals(-1, ReplicationControlManager.bestLeader(
-new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
-assertEquals(4, ReplicationControlManager.bestLeader(
-new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
+public void testReassignPartitions() throws Exception {
+ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+ReplicationControlManager replication = ctx.replicationControl;
+ctx.registerBrokers(0, 1, 2, 3);
+ctx.unfenceBrokers(0, 1, 2, 3);
+Uuid fooId = ctx.createTestTopic("foo", new int[][] {
+new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
+ctx.createTestTopic("bar", new int[][] {
+new int[] {1, 2, 3}}).topicId();
+assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null));
+ControllerResult alterResult =
+replication.alterPartitionReassignments(
+new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+new ReassignablePartition().setPartitionIndex(0).
+setReplicas(Arrays.asList(3, 2, 1)),
+new ReassignablePartition().setPartitionIndex(1).
+setReplicas(Arrays.asList(0, 2, 1)),
+new ReassignablePartition().setPartitionIndex(2).
+setReplicas(Arrays.asList(0, 2, 1,
+new ReassignableTopic().setName("bar";
+assertEquals(new AlterPartitionReassignmentsResponseData().
+setErrorMessage(null).setResponses(Arrays.asList(
+new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+new 
ReassignablePartitionResponse().setPartitionIndex(0).
+setErrorMessage(null),
+new 
ReassignablePartitionResponse().setPartitionIndex(1).
+setErrorMessage(null),
+new 
ReassignablePartitionResponse().setPartitionIndex(2).
+setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+setErrorMessage("Unable to find partition 
foo:2."))),
+new ReassignableTopicResponse().
+setName("bar"))),
+alterResult.response());
+ctx.replay(alterResult.records());
+ListPartitionReassignmentsResponseData currentReassigning =
+new ListPartitionReassignmentsResponseData().setErrorMessage(null).
+

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669208386



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+} else if (i == registration.isr.length - 1 && isr.isEmpty()) {
+// This is a special case where taking out all the "adding" 
replicas is
+// not possible. The reason it is not possible is that doing 
so would
+// create an empty ISR, which is not allowed.
+//
+// In this case, we leave in one of the adding replicas 
permanently.

Review comment:
   One weird side effect of doing this is that it can change the 
replication factor for a partition to be different from the original one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669998610



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+private final boolean unclean;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+}
+}
+for (int replica : registration.replicas) {
+if (!adding.contains(replica)) {
+this.replicas.add(replica);
+}
+}
+if (isr.isEmpty()) {
+// In the special case that all the replicas that are in the ISR 
are also
+// contained in addingReplicas, we choose the first remaining 
replica and add
+// it to the ISR. This is considered an unclean leader election. 
Therefore,
+// calling code must check that unclean leader election is enabled 
before
+// accepting the new ISR.
+if (this.replicas.isEmpty()) {
+// This should not be reachable, since it would require a 
partition
+// starting with an empty replica set prior to the 
reassignment we are
+// trying to revert.
+throw new InvalidReplicaAssignmentException("Invalid replica " 
+
+"assignment: addingReplicas contains all replicas.");
+}
+isr.add(replicas.get(0));

Review comment:
   Hmm, do we need to change isr here? It seems that BestLeader handles the 
unclean leader election with empty isr already.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import 

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-15 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r670609446



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+private final boolean unclean;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+}
+}
+for (int replica : registration.replicas) {
+if (!adding.contains(replica)) {
+this.replicas.add(replica);
+}
+}
+if (isr.isEmpty()) {
+// In the special case that all the replicas that are in the ISR 
are also
+// contained in addingReplicas, we choose the first remaining 
replica and add
+// it to the ISR. This is considered an unclean leader election. 
Therefore,
+// calling code must check that unclean leader election is enabled 
before
+// accepting the new ISR.
+if (this.replicas.isEmpty()) {
+// This should not be reachable, since it would require a 
partition
+// starting with an empty replica set prior to the 
reassignment we are
+// trying to revert.
+throw new InvalidReplicaAssignmentException("Invalid replica " 
+
+"assignment: addingReplicas contains all replicas.");
+}
+isr.add(replicas.get(0));

Review comment:
   It seems that setting the unclean flag is enough to bypass 
PartitionChangeBuilder?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-15 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r670647614



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionChangeBuilder handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {
+this.alwaysElectPreferredIfPossible = alwaysElectPreferredIfPossible;
+return this;
+}
+
+public PartitionChangeBuilder setTargetRemoving(List 
targetRemoving) {
+this.targetRemoving = targetRemoving;
+return this;
+}
+
+public PartitionChangeBuilder setTargetAdding(List targetAdding) {
+this.targetAdding = targetAdding;
+return this;
+}
+
+boolean shouldTryElection() {
+// If the new isr doesn't have the current leader, we need to try to 
elect a new
+// one. Note: this also handles the case where the current leader is 
NO_LEADER,
+// since that value cannot appear in targetIsr.
+if (!targetIsr.contains(partition.leader)) return true;
+
+// Check if we want to try to get away from a non-preferred leader.
+if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+
+return false;
+}
+
+class BestLeader {
+final int node;
+final