[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r670647614 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; + +/** + * PartitionChangeBuilder handles changing partition registrations. + */ +public class PartitionChangeBuilder { +public static boolean changeRecordIsNoOp(PartitionChangeRecord record) { +if (record.isr() != null) return false; +if (record.leader() != NO_LEADER_CHANGE) return false; +if (record.replicas() != null) return false; +if (record.removingReplicas() != null) return false; +if (record.addingReplicas() != null) return false; +return true; +} + +private final PartitionRegistration partition; +private final Uuid topicId; +private final int partitionId; +private final Function isAcceptableLeader; +private final Supplier uncleanElectionOk; +private List targetIsr; +private List targetReplicas; +private List targetRemoving; +private List targetAdding; +private boolean alwaysElectPreferredIfPossible; + +public PartitionChangeBuilder(PartitionRegistration partition, + Uuid topicId, + int partitionId, + Function isAcceptableLeader, + Supplier uncleanElectionOk) { +this.partition = partition; +this.topicId = topicId; +this.partitionId = partitionId; +this.isAcceptableLeader = isAcceptableLeader; +this.uncleanElectionOk = uncleanElectionOk; +this.targetIsr = Replicas.toList(partition.isr); +this.targetReplicas = Replicas.toList(partition.replicas); +this.targetRemoving = Replicas.toList(partition.removingReplicas); +this.targetAdding = Replicas.toList(partition.addingReplicas); +this.alwaysElectPreferredIfPossible = false; +} + +public PartitionChangeBuilder setTargetIsr(List targetIsr) { +this.targetIsr = targetIsr; +return this; +} + +public PartitionChangeBuilder setTargetReplicas(List targetReplicas) { +this.targetReplicas = targetReplicas; +return this; +} + +public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean alwaysElectPreferredIfPossible) { +this.alwaysElectPreferredIfPossible = alwaysElectPreferredIfPossible; +return this; +} + +public PartitionChangeBuilder setTargetRemoving(List targetRemoving) { +this.targetRemoving = targetRemoving; +return this; +} + +public PartitionChangeBuilder setTargetAdding(List targetAdding) { +this.targetAdding = targetAdding; +return this; +} + +boolean shouldTryElection() { +// If the new isr doesn't have the current leader, we need to try to elect a new +// one. Note: this also handles the case where the current leader is NO_LEADER, +// since that value cannot appear in targetIsr. +if (!targetIsr.contains(partition.leader)) return true; + +// Check if we want to try to get away from a non-preferred leader. +if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + +return false; +} + +class BestLeader { +final int node; +final
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r670609446 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; + +import java.util.ArrayList; +import java.util.Set; +import java.util.List; +import java.util.Objects; + + +class PartitionReassignmentRevert { +private final List replicas; +private final List isr; +private final boolean unclean; + +PartitionReassignmentRevert(PartitionRegistration registration) { +// Figure out the replica list and ISR that we will have after reverting the +// reassignment. In general, we want to take out any replica that the reassignment +// was adding, but keep the ones the reassignment was removing. (But see the +// special case below.) +Set adding = Replicas.toSet(registration.addingReplicas); +this.replicas = new ArrayList<>(registration.replicas.length); +this.isr = new ArrayList<>(registration.isr.length); +for (int i = 0; i < registration.isr.length; i++) { +int replica = registration.isr[i]; +if (!adding.contains(replica)) { +this.isr.add(replica); +} +} +for (int replica : registration.replicas) { +if (!adding.contains(replica)) { +this.replicas.add(replica); +} +} +if (isr.isEmpty()) { +// In the special case that all the replicas that are in the ISR are also +// contained in addingReplicas, we choose the first remaining replica and add +// it to the ISR. This is considered an unclean leader election. Therefore, +// calling code must check that unclean leader election is enabled before +// accepting the new ISR. +if (this.replicas.isEmpty()) { +// This should not be reachable, since it would require a partition +// starting with an empty replica set prior to the reassignment we are +// trying to revert. +throw new InvalidReplicaAssignmentException("Invalid replica " + +"assignment: addingReplicas contains all replicas."); +} +isr.add(replicas.get(0)); Review comment: It seems that setting the unclean flag is enough to bypass PartitionChangeBuilder? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669998610 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; + +import java.util.ArrayList; +import java.util.Set; +import java.util.List; +import java.util.Objects; + + +class PartitionReassignmentRevert { +private final List replicas; +private final List isr; +private final boolean unclean; + +PartitionReassignmentRevert(PartitionRegistration registration) { +// Figure out the replica list and ISR that we will have after reverting the +// reassignment. In general, we want to take out any replica that the reassignment +// was adding, but keep the ones the reassignment was removing. (But see the +// special case below.) +Set adding = Replicas.toSet(registration.addingReplicas); +this.replicas = new ArrayList<>(registration.replicas.length); +this.isr = new ArrayList<>(registration.isr.length); +for (int i = 0; i < registration.isr.length; i++) { +int replica = registration.isr[i]; +if (!adding.contains(replica)) { +this.isr.add(replica); +} +} +for (int replica : registration.replicas) { +if (!adding.contains(replica)) { +this.replicas.add(replica); +} +} +if (isr.isEmpty()) { +// In the special case that all the replicas that are in the ISR are also +// contained in addingReplicas, we choose the first remaining replica and add +// it to the ISR. This is considered an unclean leader election. Therefore, +// calling code must check that unclean leader election is enabled before +// accepting the new ISR. +if (this.replicas.isEmpty()) { +// This should not be reachable, since it would require a partition +// starting with an empty replica set prior to the reassignment we are +// trying to revert. +throw new InvalidReplicaAssignmentException("Invalid replica " + +"assignment: addingReplicas contains all replicas."); +} +isr.add(replicas.get(0)); Review comment: Hmm, do we need to change isr here? It seems that BestLeader handles the unclean leader election with empty isr already. ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669208386 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; + +import java.util.ArrayList; +import java.util.Set; +import java.util.List; +import java.util.Objects; + + +class PartitionReassignmentRevert { +private final List replicas; +private final List isr; + +PartitionReassignmentRevert(PartitionRegistration registration) { +// Figure out the replica list and ISR that we will have after reverting the +// reassignment. In general, we want to take out any replica that the reassignment +// was adding, but keep the ones the reassignment was removing. (But see the +// special case below.) +Set adding = Replicas.toSet(registration.addingReplicas); +this.replicas = new ArrayList<>(registration.replicas.length); +this.isr = new ArrayList<>(registration.isr.length); +for (int i = 0; i < registration.isr.length; i++) { +int replica = registration.isr[i]; +if (!adding.contains(replica)) { +this.isr.add(replica); +} else if (i == registration.isr.length - 1 && isr.isEmpty()) { +// This is a special case where taking out all the "adding" replicas is +// not possible. The reason it is not possible is that doing so would +// create an empty ISR, which is not allowed. +// +// In this case, we leave in one of the adding replicas permanently. Review comment: One weird side effect of doing this is that it can change the replication factor for a partition to be different from the original one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669155613 ## File path: metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java ## @@ -88,9 +90,34 @@ public void testCopyWithout() { assertArrayEquals(new int[] {4, 1}, Replicas.copyWithout(new int[] {4, 2, 2, 1}, 2)); } +@Test +public void testCopyWithout2() { +assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {}, new int[] {})); +assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {1}, new int[] {1})); +assertArrayEquals(new int[] {1, 3}, +Replicas.copyWithout(new int[] {1, 2, 3}, new int[]{2, 4})); +assertArrayEquals(new int[] {4}, +Replicas.copyWithout(new int[] {4, 2, 2, 1}, new int[]{2, 1})); +} + @Test public void testCopyWith() { assertArrayEquals(new int[] {-1}, Replicas.copyWith(new int[] {}, -1)); assertArrayEquals(new int[] {1, 2, 3, 4}, Replicas.copyWith(new int[] {1, 2, 3}, 4)); } + +@Test +public void testToSet() { +assertEquals(Collections.emptySet(), Replicas.toSet(new int[] {})); +assertEquals(new HashSet<>(Arrays.asList(3, 1, 5)), Review comment: Could we add a test where the input has duplicates? ## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ## @@ -763,17 +747,227 @@ public void testValidateBadManualPartitionAssignments() throws Exception { OptionalInt.of(3))).getMessage()); } +private final static ListPartitionReassignmentsResponseData NONE_REASSIGNING = +new ListPartitionReassignmentsResponseData().setErrorMessage(null); + @Test -public void testBestLeader() { -assertEquals(2, ReplicationControlManager.bestLeader( -new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true)); -assertEquals(3, ReplicationControlManager.bestLeader( -new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true)); -assertEquals(4, ReplicationControlManager.bestLeader( -new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4)); -assertEquals(-1, ReplicationControlManager.bestLeader( -new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4)); -assertEquals(4, ReplicationControlManager.bestLeader( -new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4)); +public void testReassignPartitions() throws Exception { +ReplicationControlTestContext ctx = new ReplicationControlTestContext(); +ReplicationControlManager replication = ctx.replicationControl; +ctx.registerBrokers(0, 1, 2, 3); +ctx.unfenceBrokers(0, 1, 2, 3); +Uuid fooId = ctx.createTestTopic("foo", new int[][] { +new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId(); +ctx.createTestTopic("bar", new int[][] { +new int[] {1, 2, 3}}).topicId(); +assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null)); +ControllerResult alterResult = +replication.alterPartitionReassignments( +new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( +new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( +new ReassignablePartition().setPartitionIndex(0). +setReplicas(Arrays.asList(3, 2, 1)), +new ReassignablePartition().setPartitionIndex(1). +setReplicas(Arrays.asList(0, 2, 1)), +new ReassignablePartition().setPartitionIndex(2). +setReplicas(Arrays.asList(0, 2, 1, +new ReassignableTopic().setName("bar"; +assertEquals(new AlterPartitionReassignmentsResponseData(). +setErrorMessage(null).setResponses(Arrays.asList( +new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( +new ReassignablePartitionResponse().setPartitionIndex(0). +setErrorMessage(null), +new ReassignablePartitionResponse().setPartitionIndex(1). +setErrorMessage(null), +new ReassignablePartitionResponse().setPartitionIndex(2). +setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). +setErrorMessage("Unable to find partition foo:2."))), +new ReassignableTopicResponse(). +setName("bar"))), +alterResult.response()); +ctx.replay(alterResult.records()); +ListPartitionReassignmentsResponseData currentReassigning = +new ListPartitionReassignmentsResponseData().setErrorMessage(null). +
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r668323818 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; + +/** + * PartitionMutator handles changing partition registrations. + */ +public class PartitionChangeBuilder { +public static boolean changeRecordIsNoOp(PartitionChangeRecord record) { +if (record.isr() != null) return false; +if (record.leader() != NO_LEADER_CHANGE) return false; +if (record.replicas() != null) return false; +if (record.removingReplicas() != null) return false; +if (record.addingReplicas() != null) return false; +return true; +} + +private final PartitionRegistration partition; +private final Uuid topicId; +private final int partitionId; +private final Function isAcceptableLeader; +private final Supplier uncleanElectionOk; +private List targetIsr; +private List targetReplicas; +private List targetRemoving; +private List targetAdding; +private boolean alwaysElectPreferredIfPossible; + +public PartitionChangeBuilder(PartitionRegistration partition, + Uuid topicId, + int partitionId, + Function isAcceptableLeader, + Supplier uncleanElectionOk) { +this.partition = partition; +this.topicId = topicId; +this.partitionId = partitionId; +this.isAcceptableLeader = isAcceptableLeader; +this.uncleanElectionOk = uncleanElectionOk; +this.targetIsr = Replicas.toList(partition.isr); +this.targetReplicas = Replicas.toList(partition.replicas); +this.targetRemoving = Replicas.toList(partition.removingReplicas); +this.targetAdding = Replicas.toList(partition.addingReplicas); +this.alwaysElectPreferredIfPossible = false; +} + +public PartitionChangeBuilder setTargetIsr(List targetIsr) { +this.targetIsr = targetIsr; +return this; +} + +public PartitionChangeBuilder setTargetReplicas(List targetReplicas) { +this.targetReplicas = targetReplicas; +return this; +} + +public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean alwaysElectPreferredIfPossible) { Review comment: Do we need this since it's only used in tests? ## File path: metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java ## @@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() { setIsNew(false).toString(), b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), false).toString()); } + +@Test +public void testMergePartitionChangeRecordWithReassignmentData() { +PartitionRegistration partition0 = new PartitionRegistration(new int[] {1, 2, 3}, +new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200); +PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord(). +setRemovingReplicas(Collections.singletonList(3)). +setAddingReplicas(Collections.singletonList(4)). +setReplicas(Arrays.asList(1, 2, 3, 4))); +assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4}, +new int[] {
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r667223580 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context, } } +PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic, + int partitionId, + PartitionControlInfo partition, + int[] newIsr, + Function isAcceptableLeader) { +PartitionChangeRecord record = new PartitionChangeRecord(). +setTopicId(topic.id). +setPartitionId(partitionId); +int[] newReplicas = partition.replicas; +if (partition.isrChangeCompletesReassignment(newIsr)) { +if (partition.addingReplicas.length > 0) { +record.setAddingReplicas(Collections.emptyList()); +} +if (partition.removingReplicas.length > 0) { +record.setRemovingReplicas(Collections.emptyList()); +newIsr = Replicas.copyWithout(newIsr, partition.removingReplicas); +newReplicas = Replicas.copyWithout(partition.replicas, partition.removingReplicas); +} +} +int newLeader; +if (Replicas.contains(newIsr, partition.leader)) { +// If the current leader is good, don't change. +newLeader = partition.leader; +} else { +// Choose a new leader. +boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name); +newLeader = bestLeader(newReplicas, newIsr, uncleanOk, isAcceptableLeader); +} +if (!electionWasClean(newLeader, newIsr)) { +// After an unclean leader election, the ISR is reset to just the new leader. +newIsr = new int[] {newLeader}; +} else if (newIsr.length == 0) { +// We never want to shrink the ISR to size 0. +newIsr = partition.isr; +} +if (newLeader != partition.leader) record.setLeader(newLeader); +if (!Arrays.equals(newIsr, partition.isr)) { +record.setIsr(Replicas.toList(newIsr)); +} +if (!Arrays.equals(newReplicas, partition.replicas)) { +record.setReplicas(Replicas.toList(newReplicas)); +} +return record; +} + +ControllerResult +alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) { +List records = new ArrayList<>(); +AlterPartitionReassignmentsResponseData result = +new AlterPartitionReassignmentsResponseData().setErrorMessage(null); +int successfulAlterations = 0, totalAlterations = 0; +for (ReassignableTopic topic : request.topics()) { +ReassignableTopicResponse topicResponse = new ReassignableTopicResponse(). +setName(topic.name()); +for (ReassignablePartition partition : topic.partitions()) { +ApiError error = ApiError.NONE; +try { +alterPartitionReassignment(topic.name(), partition, records); +successfulAlterations++; +} catch (Throwable e) { +log.info("Unable to alter partition reassignment for " + +topic.name() + ":" + partition.partitionIndex() + " because " + +"of an " + e.getClass().getSimpleName() + " error: " + e.getMessage()); +error = ApiError.fromThrowable(e); +} +totalAlterations++; +topicResponse.partitions().add(new ReassignablePartitionResponse(). +setPartitionIndex(partition.partitionIndex()). +setErrorCode(error.error().code()). +setErrorMessage(error.message())); +} +result.responses().add(topicResponse); +} +log.info("Successfully altered {} out of {} partition reassignment(s).", +successfulAlterations, totalAlterations); +return ControllerResult.atomicOf(records, result); +} + +void alterPartitionReassignment(String topicName, +ReassignablePartition partition, +List records) { +Uuid topicId = topicsByName.get(topicName); +if (topicId == null) { +throw new UnknownTopicOrPartitionException("Unable to find a topic " + +"named " + topicName + "."); +} +TopicControlInfo topicInfo = topics.get(topicId); +if (topicInfo == null) { +throw new UnknownTopicOrPartitionException("Unable to find a topic " +
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r666339593 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context, } } +PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic, + int partitionId, + PartitionControlInfo partition, + int[] newIsr, + Function isAcceptableLeader) { +PartitionChangeRecord record = new PartitionChangeRecord(). +setTopicId(topic.id). +setPartitionId(partitionId); +int[] newReplicas = partition.replicas; +if (partition.isrChangeCompletesReassignment(newIsr)) { +if (partition.addingReplicas.length > 0) { +record.setAddingReplicas(Collections.emptyList()); +} +if (partition.removingReplicas.length > 0) { +record.setRemovingReplicas(Collections.emptyList()); +newIsr = Replicas.copyWithout(newIsr, partition.removingReplicas); +newReplicas = Replicas.copyWithout(partition.replicas, partition.removingReplicas); +} +} +int newLeader; +if (Replicas.contains(newIsr, partition.leader)) { +// If the current leader is good, don't change. +newLeader = partition.leader; +} else { +// Choose a new leader. +boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name); +newLeader = bestLeader(newReplicas, newIsr, uncleanOk, isAcceptableLeader); +} +if (!electionWasClean(newLeader, newIsr)) { +// After an unclean leader election, the ISR is reset to just the new leader. +newIsr = new int[] {newLeader}; +} else if (newIsr.length == 0) { +// We never want to shrink the ISR to size 0. +newIsr = partition.isr; +} +if (newLeader != partition.leader) record.setLeader(newLeader); +if (!Arrays.equals(newIsr, partition.isr)) { +record.setIsr(Replicas.toList(newIsr)); +} +if (!Arrays.equals(newReplicas, partition.replicas)) { +record.setReplicas(Replicas.toList(newReplicas)); +} +return record; +} + +ControllerResult +alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) { +List records = new ArrayList<>(); +AlterPartitionReassignmentsResponseData result = +new AlterPartitionReassignmentsResponseData().setErrorMessage(null); +int successfulAlterations = 0, totalAlterations = 0; +for (ReassignableTopic topic : request.topics()) { +ReassignableTopicResponse topicResponse = new ReassignableTopicResponse(). +setName(topic.name()); +for (ReassignablePartition partition : topic.partitions()) { +ApiError error = ApiError.NONE; +try { +alterPartitionReassignment(topic.name(), partition, records); +successfulAlterations++; +} catch (Throwable e) { +log.info("Unable to alter partition reassignment for " + +topic.name() + ":" + partition.partitionIndex() + " because " + +"of an " + e.getClass().getSimpleName() + " error: " + e.getMessage()); +error = ApiError.fromThrowable(e); +} +totalAlterations++; +topicResponse.partitions().add(new ReassignablePartitionResponse(). +setPartitionIndex(partition.partitionIndex()). +setErrorCode(error.error().code()). +setErrorMessage(error.message())); +} +result.responses().add(topicResponse); +} +log.info("Successfully altered {} out of {} partition reassignment(s).", +successfulAlterations, totalAlterations); +return ControllerResult.atomicOf(records, result); +} + +void alterPartitionReassignment(String topicName, +ReassignablePartition partition, +List records) { +Uuid topicId = topicsByName.get(topicName); +if (topicId == null) { +throw new UnknownTopicOrPartitionException("Unable to find a topic " + +"named " + topicName + "."); +} +TopicControlInfo topicInfo = topics.get(topicId); +if (topicInfo == null) { +throw new UnknownTopicOrPartitionException("Unable to find a topic " +
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r644352228 ## File path: metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; + + +class RemovingAndAddingReplicas { +private final Set removing; +private final Set adding; + +RemovingAndAddingReplicas(int[] removing, int[] adding) { +this.removing = new HashSet<>(); +for (int replica : removing) { +this.removing.add(replica); +} +this.adding = new HashSet<>(); +for (int replica : adding) { +this.adding.add(replica); +} +} + +RemovingAndAddingReplicas(List removing, List adding) { +this.removing = new HashSet<>(removing); +this.adding = new HashSet<>(adding); +} + +/** + * Calculate what replicas need to be added and removed to reach a specific target + * replica list. + * + * @param currentReplicas The current replica list. + * @param targetReplicasThe target replica list. + * + * @return An object containing the removing and adding replicas. + */ +static RemovingAndAddingReplicas forTarget(List currentReplicas, + List targetReplicas) { +List removingReplicas = new ArrayList<>(); +List addingReplicas = new ArrayList<>(); +List sortedCurrentReplicas = new ArrayList<>(currentReplicas); +sortedCurrentReplicas.sort(Integer::compareTo); +List sortedTargetReplicas = new ArrayList<>(targetReplicas); +sortedTargetReplicas.sort(Integer::compareTo); +int i = 0, j = 0; +while (true) { +if (i < sortedCurrentReplicas.size()) { +int currentReplica = sortedCurrentReplicas.get(i); +if (j < sortedTargetReplicas.size()) { +int targetReplica = sortedTargetReplicas.get(j); +if (currentReplica < targetReplica) { +removingReplicas.add(currentReplica); +i++; +} else if (currentReplica > targetReplica) { +addingReplicas.add(targetReplica); +j++; +} else { +i++; +j++; +} +} else { +removingReplicas.add(currentReplica); +i++; +} +} else if (j < sortedTargetReplicas.size()) { +int targetReplica = sortedTargetReplicas.get(j); +addingReplicas.add(targetReplica); +j++; +} else { +break; +} +} +return new RemovingAndAddingReplicas(removingReplicas, addingReplicas); +} + +/** + * Calculate the merged replica list following a reassignment. + * + * The merged list will contain all of the target replicas, in the order they appear + * in the target list. It will also contain existing replicas that are scheduled to + * be removed. + * + * If a removing replica was in position X in the original replica list, it will + * appear in the merged list following the appearance of X non-new replicas. Review comment: The logic seems a bit complicated and I am not sure how useful it is. For example, if current and target are (1, 3, 4) and (1, 2, 4), this method returns (1, 3, 2, 4). The old code calculates merged replicas simply as (target ++ originReplicas).distinct. So, it would be (1,2,4,3). Any benefit of the new way of merging replicas? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context, }
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r644352228 ## File path: metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; + + +class RemovingAndAddingReplicas { +private final Set removing; +private final Set adding; + +RemovingAndAddingReplicas(int[] removing, int[] adding) { +this.removing = new HashSet<>(); +for (int replica : removing) { +this.removing.add(replica); +} +this.adding = new HashSet<>(); +for (int replica : adding) { +this.adding.add(replica); +} +} + +RemovingAndAddingReplicas(List removing, List adding) { +this.removing = new HashSet<>(removing); +this.adding = new HashSet<>(adding); +} + +/** + * Calculate what replicas need to be added and removed to reach a specific target + * replica list. + * + * @param currentReplicas The current replica list. + * @param targetReplicasThe target replica list. + * + * @return An object containing the removing and adding replicas. + */ +static RemovingAndAddingReplicas forTarget(List currentReplicas, + List targetReplicas) { +List removingReplicas = new ArrayList<>(); +List addingReplicas = new ArrayList<>(); +List sortedCurrentReplicas = new ArrayList<>(currentReplicas); +sortedCurrentReplicas.sort(Integer::compareTo); +List sortedTargetReplicas = new ArrayList<>(targetReplicas); +sortedTargetReplicas.sort(Integer::compareTo); +int i = 0, j = 0; +while (true) { +if (i < sortedCurrentReplicas.size()) { +int currentReplica = sortedCurrentReplicas.get(i); +if (j < sortedTargetReplicas.size()) { +int targetReplica = sortedTargetReplicas.get(j); +if (currentReplica < targetReplica) { +removingReplicas.add(currentReplica); +i++; +} else if (currentReplica > targetReplica) { +addingReplicas.add(targetReplica); +j++; +} else { +i++; +j++; +} +} else { +removingReplicas.add(currentReplica); +i++; +} +} else if (j < sortedTargetReplicas.size()) { +int targetReplica = sortedTargetReplicas.get(j); +addingReplicas.add(targetReplica); +j++; +} else { +break; +} +} +return new RemovingAndAddingReplicas(removingReplicas, addingReplicas); +} + +/** + * Calculate the merged replica list following a reassignment. + * + * The merged list will contain all of the target replicas, in the order they appear + * in the target list. It will also contain existing replicas that are scheduled to + * be removed. + * + * If a removing replica was in position X in the original replica list, it will + * appear in the merged list following the appearance of X non-new replicas. Review comment: The logic seems a bit complicated and I am not sure how useful it is. For example, if current and target are (1, 3, 4) and (1, 2, 4), this method returns (1, 3, 2, 4). The old code calculates merged replicas simply as (target ++ originReplicas).distinct. So, it would be (1,2,4,3). Any benefit of the new way of merging replicas? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context, }