This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d38189 MINOR: clean up some replication code (#10564)
8d38189 is described below
commit 8d38189eddd66d8ae53749f15bfff557f102a936
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Apr 29 11:20:30 2021 -0700
MINOR: clean up some replication code (#10564)
Centralize leader and ISR changes in generateLeaderAndIsrUpdates.
Consolidate handleNodeDeactivated and handleNodeActivated into this
function.
Rename BrokersToIsrs#noLeaderIterator to
BrokersToIsrs#partitionsWithNoLeader.
Create BrokersToIsrs#partitionsLedByBroker,
BrokersToIsrs#partitionsWithBrokerInIsr
In ReplicationControlManagerTest, createTestTopic should be a member
function of ReplicationControlTestContext. It should invoke
ReplicationControlTestContext#replay so that records are applied to all
parts of the test context.
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/controller/BrokersToIsrs.java | 10 +-
.../controller/ConfigurationControlManager.java | 4 +
.../controller/ReplicationControlManager.java | 300 +++++++++++----------
.../apache/kafka/controller/BrokersToIsrsTest.java | 4 +-
.../kafka/controller/QuorumControllerTest.java | 2 +-
.../controller/ReplicationControlManagerTest.java | 97 ++++---
6 files changed, 241 insertions(+), 176 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index 9d54c20..d8e0319 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -311,10 +311,18 @@ public class BrokersToIsrs {
return new PartitionsOnReplicaIterator(topicMap, leadersOnly);
}
- PartitionsOnReplicaIterator noLeaderIterator() {
+ PartitionsOnReplicaIterator partitionsWithNoLeader() {
return iterator(NO_LEADER, true);
}
+ PartitionsOnReplicaIterator partitionsLedByBroker(int brokerId) {
+ return iterator(brokerId, true);
+ }
+
+ PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) {
+ return iterator(brokerId, false);
+ }
+
boolean hasLeaderships(int brokerId) {
return iterator(brokerId, true).hasNext();
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 3e9e9e1..b53926e 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -374,6 +374,10 @@ public class ConfigurationControlManager {
configData.remove(new ConfigResource(Type.TOPIC, name));
}
+ boolean uncleanLeaderElectionEnabledForTopic(String name) {
+ return false; // TODO: support configuring unclean leader election.
+ }
+
class ConfigurationControlIterator implements
Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<Entry<ConfigResource, TimelineHashMap<String,
String>>> iterator;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index f169d1f..ea94a00 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -72,9 +72,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.function.Function;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
@@ -172,47 +174,54 @@ public class ReplicationControlManager {
StringBuilder builder = new StringBuilder();
String prefix = "";
if (!Arrays.equals(replicas, prev.replicas)) {
-
builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
+ builder.append(prefix).append("replicas: ").
+ append(Arrays.toString(prev.replicas)).
+ append(" -> ").append(Arrays.toString(replicas));
prefix = ", ";
-
builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas));
}
if (!Arrays.equals(isr, prev.isr)) {
-
builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
+ builder.append(prefix).append("isr: ").
+ append(Arrays.toString(prev.isr)).
+ append(" -> ").append(Arrays.toString(isr));
prefix = ", ";
-
builder.append(prefix).append("newIsr=").append(Arrays.toString(isr));
}
if (!Arrays.equals(removingReplicas, prev.removingReplicas)) {
- builder.append(prefix).append("oldRemovingReplicas=").
- append(Arrays.toString(prev.removingReplicas));
+ builder.append(prefix).append("removingReplicas: ").
+ append(Arrays.toString(prev.removingReplicas)).
+ append(" -> ").append(Arrays.toString(removingReplicas));
prefix = ", ";
- builder.append(prefix).append("newRemovingReplicas=").
- append(Arrays.toString(removingReplicas));
}
if (!Arrays.equals(addingReplicas, prev.addingReplicas)) {
- builder.append(prefix).append("oldAddingReplicas=").
- append(Arrays.toString(prev.addingReplicas));
+ builder.append(prefix).append("addingReplicas: ").
+ append(Arrays.toString(prev.addingReplicas)).
+ append(" -> ").append(Arrays.toString(addingReplicas));
prefix = ", ";
- builder.append(prefix).append("newAddingReplicas=").
- append(Arrays.toString(addingReplicas));
}
if (leader != prev.leader) {
-
builder.append(prefix).append("oldLeader=").append(prev.leader);
+ builder.append(prefix).append("leader: ").
+ append(prev.leader).append(" -> ").append(leader);
prefix = ", ";
- builder.append(prefix).append("newLeader=").append(leader);
}
if (leaderEpoch != prev.leaderEpoch) {
-
builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
+ builder.append(prefix).append("leaderEpoch: ").
+ append(prev.leaderEpoch).append(" ->
").append(leaderEpoch);
prefix = ", ";
-
builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch);
}
if (partitionEpoch != prev.partitionEpoch) {
-
builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
- prefix = ", ";
-
builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch);
+ builder.append(prefix).append("partitionEpoch: ").
+ append(prev.partitionEpoch).append(" ->
").append(partitionEpoch);
}
return builder.toString();
}
+ void maybeLogPartitionChange(Logger log, String description,
PartitionControlInfo prev) {
+ if (!electionWasClean(leader, prev.isr)) {
+ log.info("UNCLEAN partition change for {}: {}", description,
diff(prev));
+ } else if (log.isDebugEnabled()) {
+ log.debug("partition change for {}: {}", description,
diff(prev));
+ }
+ }
+
boolean hasLeader() {
return leader != NO_LEADER;
}
@@ -231,7 +240,13 @@ public class ReplicationControlManager {
public boolean equals(Object o) {
if (!(o instanceof PartitionControlInfo)) return false;
PartitionControlInfo other = (PartitionControlInfo) o;
- return diff(other).isEmpty();
+ return Arrays.equals(replicas, other.replicas) &&
+ Arrays.equals(isr, other.isr) &&
+ Arrays.equals(removingReplicas, other.removingReplicas) &&
+ Arrays.equals(addingReplicas, other.addingReplicas) &&
+ leader == other.leader &&
+ leaderEpoch == other.leaderEpoch &&
+ partitionEpoch == other.partitionEpoch;
}
@Override
@@ -310,7 +325,7 @@ public class ReplicationControlManager {
topicsByName.put(record.name(), record.topicId());
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry,
record.topicId()));
- log.info("Created topic {} with ID {}.", record.name(),
record.topicId());
+ log.info("Created topic {} with topic ID {}.", record.name(),
record.topicId());
}
public void replay(PartitionRecord record) {
@@ -321,22 +336,18 @@ public class ReplicationControlManager {
}
PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
PartitionControlInfo prevPartInfo =
topicInfo.parts.get(record.partitionId());
+ String description = topicInfo.name + "-" + record.partitionId() +
+ " with topic ID " + record.topicId();
if (prevPartInfo == null) {
- log.info("Created partition {}:{} with {}.", record.topicId(),
- record.partitionId(), newPartInfo.toString());
+ log.info("Created partition {} and {}.", description, newPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
- } else {
- String diff = newPartInfo.diff(prevPartInfo);
- if (!diff.isEmpty()) {
- log.info("Modified partition {}:{}: {}.", record.topicId(),
- record.partitionId(), diff);
- topicInfo.parts.put(record.partitionId(), newPartInfo);
- brokersToIsrs.update(record.topicId(), record.partitionId(),
- prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader,
- newPartInfo.leader);
- }
+ } else if (!newPartInfo.equals(prevPartInfo)) {
+ newPartInfo.maybeLogPartitionChange(log, description,
prevPartInfo);
+ topicInfo.parts.put(record.partitionId(), newPartInfo);
+ brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartInfo.isr,
+ newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
}
}
@@ -356,7 +367,9 @@ public class ReplicationControlManager {
brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartitionInfo.isr, newPartitionInfo.isr,
prevPartitionInfo.leader,
newPartitionInfo.leader);
- log.debug("Applied ISR change record: {}", record.toString());
+ String topicPart = topicInfo.name + "-" + record.partitionId() + "
with topic ID " +
+ record.topicId();
+ newPartitionInfo.maybeLogPartitionChange(log, topicPart,
prevPartitionInfo);
}
public void replay(RemoveTopicRecord record) {
@@ -723,7 +736,8 @@ public class ReplicationControlManager {
if (brokerRegistration == null) {
throw new RuntimeException("Can't find broker registration for
broker " + brokerId);
}
- handleNodeDeactivated(brokerId, records);
+ generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER,
records,
+ brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0));
}
@@ -740,61 +754,13 @@ public class ReplicationControlManager {
*/
void handleBrokerUnregistered(int brokerId, long brokerEpoch,
List<ApiMessageAndVersion> records) {
- handleNodeDeactivated(brokerId, records);
+ generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId,
NO_LEADER, records,
+ brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0));
}
/**
- * Handle a broker being deactivated. This means we remove it from any ISR
that has
- * more than one element. We do not remove the broker from ISRs where it
is the only
- * member since this would preclude clean leader election in the future.
- * It is removed as the leader for all partitions it leads.
- *
- * @param brokerId The broker id.
- * @param records The record list to append to.
- */
- void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion>
records) {
- Iterator<TopicIdPartition> iterator = brokersToIsrs.iterator(brokerId,
false);
- while (iterator.hasNext()) {
- TopicIdPartition topicIdPartition = iterator.next();
- TopicControlInfo topic = topics.get(topicIdPartition.topicId());
- if (topic == null) {
- throw new RuntimeException("Topic ID " +
topicIdPartition.topicId() + " existed in " +
- "isrMembers, but not in the topics map.");
- }
- PartitionControlInfo partition =
topic.parts.get(topicIdPartition.partitionId());
- if (partition == null) {
- throw new RuntimeException("Partition " + topicIdPartition +
- " existed in isrMembers, but not in the partitions map.");
- }
- PartitionChangeRecord record = new PartitionChangeRecord().
- setPartitionId(topicIdPartition.partitionId()).
- setTopicId(topic.id);
- int[] newIsr = Replicas.copyWithout(partition.isr, brokerId);
- if (newIsr.length == 0) {
- // We don't want to shrink the ISR to size 0. So, leave the
node in the ISR.
- if (record.leader() != NO_LEADER) {
- // The partition is now leaderless, so set its leader to
-1.
- record.setLeader(-1);
- records.add(new ApiMessageAndVersion(record, (short) 0));
- }
- } else {
- record.setIsr(Replicas.toList(newIsr));
- if (partition.leader == brokerId) {
- // The fenced node will no longer be the leader.
- int newLeader = bestLeader(partition.replicas, newIsr,
false);
- record.setLeader(newLeader);
- } else {
- // Bump the partition leader epoch.
- record.setLeader(partition.leader);
- }
- records.add(new ApiMessageAndVersion(record, (short) 0));
- }
- }
- }
-
- /**
* Generate the appropriate records to handle a broker becoming unfenced.
*
* First, we create an UnfenceBrokerRecord. Then, we check if if there are
any
@@ -808,43 +774,12 @@ public class ReplicationControlManager {
void handleBrokerUnfenced(int brokerId, long brokerEpoch,
List<ApiMessageAndVersion> records) {
records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
setId(brokerId).setEpoch(brokerEpoch), (short) 0));
- handleNodeActivated(brokerId, records);
- }
-
- /**
- * Handle a broker being activated. This means we check if it can become
the leader
- * for any partition that currently has no leader (aka offline partition).
- *
- * @param brokerId The broker id.
- * @param records The record list to append to.
- */
- void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records)
{
- Iterator<TopicIdPartition> iterator = brokersToIsrs.noLeaderIterator();
- while (iterator.hasNext()) {
- TopicIdPartition topicIdPartition = iterator.next();
- TopicControlInfo topic = topics.get(topicIdPartition.topicId());
- if (topic == null) {
- throw new RuntimeException("Topic ID " +
topicIdPartition.topicId() + " existed in " +
- "isrMembers, but not in the topics map.");
- }
- PartitionControlInfo partition =
topic.parts.get(topicIdPartition.partitionId());
- if (partition == null) {
- throw new RuntimeException("Partition " + topicIdPartition +
- " existed in isrMembers, but not in the partitions map.");
- }
- // TODO: if this partition is configured for unclean leader
election,
- // check the replica set rather than the ISR.
- if (Replicas.contains(partition.isr, brokerId)) {
- records.add(new ApiMessageAndVersion(new
PartitionChangeRecord().
- setPartitionId(topicIdPartition.partitionId()).
- setTopicId(topic.id).
- setLeader(brokerId), (short) 0));
- }
- }
+ generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER,
brokerId, records,
+ brokersToIsrs.partitionsWithNoLeader());
}
ControllerResult<ElectLeadersResponseData>
electLeaders(ElectLeadersRequestData request) {
- boolean unclean = electionIsUnclean(request.electionType());
+ boolean uncleanOk = electionTypeIsUnclean(request.electionType());
List<ApiMessageAndVersion> records = new ArrayList<>();
ElectLeadersResponseData response = new ElectLeadersResponseData();
for (TopicPartitions topic : request.topicPartitions()) {
@@ -852,7 +787,7 @@ public class ReplicationControlManager {
new ReplicaElectionResult().setTopic(topic.topic());
response.replicaElectionResults().add(topicResults);
for (int partitionId : topic.partitions()) {
- ApiError error = electLeader(topic.topic(), partitionId,
unclean, records);
+ ApiError error = electLeader(topic.topic(), partitionId,
uncleanOk, records);
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
@@ -862,7 +797,7 @@ public class ReplicationControlManager {
return ControllerResult.of(records, response);
}
- static boolean electionIsUnclean(byte electionType) {
+ static boolean electionTypeIsUnclean(byte electionType) {
ElectionType type;
try {
type = ElectionType.valueOf(electionType);
@@ -872,7 +807,7 @@ public class ReplicationControlManager {
return type == ElectionType.UNCLEAN;
}
- ApiError electLeader(String topic, int partitionId, boolean unclean,
+ ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic);
if (topicId == null) {
@@ -889,7 +824,8 @@ public class ReplicationControlManager {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
- int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr,
unclean);
+ int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr,
uncleanOk,
+ r -> clusterControl.unfenced(r));
if (newLeader == NO_LEADER) {
// If we can't find any leader for the partition, return an error.
return new ApiError(Errors.LEADER_NOT_AVAILABLE,
@@ -907,13 +843,13 @@ public class ReplicationControlManager {
}
PartitionChangeRecord record = new PartitionChangeRecord().
setPartitionId(partitionId).
- setTopicId(topicId);
- if (unclean && !Replicas.contains(partitionInfo.isr, newLeader)) {
- // If the election was unclean, we may have to forcibly add the
replica to
- // the ISR. This can result in data loss!
+ setTopicId(topicId).
+ setLeader(newLeader);
+ if (!electionWasClean(newLeader, partitionInfo.isr)) {
+ // If the election was unclean, we have to forcibly set the ISR to
just the
+ // new leader. This can result in data loss!
record.setIsr(Collections.singletonList(newLeader));
}
- record.setLeader(newLeader);
records.add(new ApiMessageAndVersion(record, (short) 0));
return ApiError.NONE;
}
@@ -936,10 +872,8 @@ public class ReplicationControlManager {
handleBrokerUnfenced(brokerId, brokerEpoch, records);
break;
case CONTROLLED_SHUTDOWN:
- // Note: we always bump the leader epoch of each partition
that the
- // shutting down broker is in here. This prevents the
broker from
- // getting re-added to the ISR later.
- handleNodeDeactivated(brokerId, records);
+ generateLeaderAndIsrUpdates("enterControlledShutdown[" +
brokerId + "]",
+ brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
break;
case SHUTDOWN_NOW:
handleBrokerFenced(brokerId, records);
@@ -957,22 +891,27 @@ public class ReplicationControlManager {
return ControllerResult.of(records, reply);
}
- int bestLeader(int[] replicas, int[] isr, boolean unclean) {
+ static boolean isGoodLeader(int[] isr, int leader) {
+ return Replicas.contains(isr, leader);
+ }
+
+ static int bestLeader(int[] replicas, int[] isr, boolean uncleanOk,
+ Function<Integer, Boolean> isAcceptableLeader) {
+ int bestUnclean = NO_LEADER;
for (int i = 0; i < replicas.length; i++) {
int replica = replicas[i];
- if (Replicas.contains(isr, replica)) {
- return replica;
- }
- }
- if (unclean) {
- for (int i = 0; i < replicas.length; i++) {
- int replica = replicas[i];
- if (clusterControl.unfenced(replica)) {
+ if (isAcceptableLeader.apply(replica)) {
+ if (bestUnclean == NO_LEADER) bestUnclean = replica;
+ if (Replicas.contains(isr, replica)) {
return replica;
}
}
}
- return NO_LEADER;
+ return uncleanOk ? bestUnclean : NO_LEADER;
+ }
+
+ static boolean electionWasClean(int newLeader, int[] isr) {
+ return newLeader == NO_LEADER || Replicas.contains(isr, newLeader);
}
public ControllerResult<Void> unregisterBroker(int brokerId) {
@@ -1119,6 +1058,83 @@ public class ReplicationControlManager {
}
}
+ /**
+ * Iterate over a sequence of partitions and generate ISR changes and/or
leader
+ * changes if necessary.
+ *
+ * @param context A human-readable context string used in log4j
logging.
+ * @param brokerToRemove NO_LEADER if no broker is being removed; the
ID of the
+ * broker to remove from the ISR and leadership,
otherwise.
+ * @param brokerToAdd NO_LEADER if no broker is being added; the ID
of the
+ * broker which is now eligible to be a leader,
otherwise.
+ * @param records A list of records which we will append to.
+ * @param iterator The iterator containing the partitions to
examine.
+ */
+ void generateLeaderAndIsrUpdates(String context,
+ int brokerToRemove,
+ int brokerToAdd,
+ List<ApiMessageAndVersion> records,
+ Iterator<TopicIdPartition> iterator) {
+ int oldSize = records.size();
+ Function<Integer, Boolean> isAcceptableLeader =
+ r -> (r != brokerToRemove) && (r == brokerToAdd ||
clusterControl.unfenced(r));
+ while (iterator.hasNext()) {
+ TopicIdPartition topicIdPart = iterator.next();
+ TopicControlInfo topic = topics.get(topicIdPart.topicId());
+ if (topic == null) {
+ throw new RuntimeException("Topic ID " + topicIdPart.topicId()
+
+ " existed in isrMembers, but not in the topics map.");
+ }
+ PartitionControlInfo partition =
topic.parts.get(topicIdPart.partitionId());
+ if (partition == null) {
+ throw new RuntimeException("Partition " + topicIdPart +
+ " existed in isrMembers, but not in the partitions map.");
+ }
+ int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
+ int newLeader;
+ if (isGoodLeader(newIsr, partition.leader)) {
+ // If the current leader is good, don't change.
+ newLeader = partition.leader;
+ } else {
+ // Choose a new leader.
+ boolean uncleanOk =
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+ newLeader = bestLeader(partition.replicas, newIsr, uncleanOk,
isAcceptableLeader);
+ }
+ if (!electionWasClean(newLeader, newIsr)) {
+ // After an unclean leader election, the ISR is reset to just
the new leader.
+ newIsr = new int[] {newLeader};
+ } else if (newIsr.length == 0) {
+ // We never want to shrink the ISR to size 0.
+ newIsr = partition.isr;
+ }
+ PartitionChangeRecord record = new PartitionChangeRecord().
+ setPartitionId(topicIdPart.partitionId()).
+ setTopicId(topic.id);
+ if (newLeader != partition.leader) record.setLeader(newLeader);
+ if (!Arrays.equals(newIsr, partition.isr))
record.setIsr(Replicas.toList(newIsr));
+ if (record.leader() != NO_LEADER_CHANGE || record.isr() != null) {
+ records.add(new ApiMessageAndVersion(record, (short) 0));
+ }
+ }
+ if (records.size() != oldSize) {
+ if (log.isDebugEnabled()) {
+ StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ for (ListIterator<ApiMessageAndVersion> iter =
records.listIterator(oldSize);
+ iter.hasNext(); ) {
+ ApiMessageAndVersion apiMessageAndVersion = iter.next();
+ PartitionChangeRecord record = (PartitionChangeRecord)
apiMessageAndVersion.message();
+
bld.append(prefix).append(topics.get(record.topicId()).name).append("-").
+ append(record.partitionId());
+ prefix = ", ";
+ }
+ log.debug("{}: changing partition(s): {}", context,
bld.toString());
+ } else if (log.isInfoEnabled()) {
+ log.info("{}: changing {} partition(s)", context,
records.size() - oldSize);
+ }
+ }
+ }
+
class ReplicationControlIterator implements
Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<TopicControlInfo> iterator;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
index 525bf1e..6510ee5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
@@ -101,9 +101,9 @@ public class BrokersToIsrsTest {
assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
toSet(brokersToIsrs.iterator(3, true)));
assertEquals(toSet(), toSet(brokersToIsrs.iterator(2, true)));
- assertEquals(toSet(), toSet(brokersToIsrs.noLeaderIterator()));
+ assertEquals(toSet(), toSet(brokersToIsrs.partitionsWithNoLeader()));
brokersToIsrs.update(UUIDS[0], 2, new int[]{1, 2, 3}, new int[]{1, 2,
3}, 3, -1);
assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
- toSet(brokersToIsrs.noLeaderIterator()));
+ toSet(brokersToIsrs.partitionsWithNoLeader()));
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index d0588a0..8c64cec 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -185,7 +185,7 @@ public class QuorumControllerTest {
topicPartitionFuture = active.appendReadEvent(
"debugGetPartition", () -> {
Iterator<TopicIdPartition> iterator = active.
-
replicationControl().brokersToIsrs().noLeaderIterator();
+
replicationControl().brokersToIsrs().partitionsWithNoLeader();
assertTrue(iterator.hasNext());
return iterator.next();
});
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index e524581..2a456be 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
@@ -105,6 +106,27 @@ public class ReplicationControlManagerTest {
ReplicationControlTestContext() {
clusterControl.activate();
}
+
+ CreatableTopicResult createTestTopic(String name, int[][] replicas)
throws Exception {
+ assertFalse(replicas.length == 0);
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreatableTopic topic = new CreatableTopic().setName(name);
+ topic.setNumPartitions(-1).setReplicationFactor((short) -1);
+ for (int i = 0; i < replicas.length; i++) {
+ topic.assignments().add(new CreatableReplicaAssignment().
+
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
+ }
+ request.topics().add(topic);
+ ControllerResult<CreateTopicsResponseData> result =
+ replicationControl.createTopics(request);
+ CreatableTopicResult topicResult =
result.response().topics().find(name);
+ assertNotNull(topicResult);
+ assertEquals((short) 0, topicResult.errorCode());
+ assertEquals(replicas.length, topicResult.numPartitions());
+ assertEquals(replicas[0].length, topicResult.replicationFactor());
+ replay(result.records());
+ return topicResult;
+ }
}
private static void registerBroker(int brokerId,
ReplicationControlTestContext ctx) {
@@ -125,7 +147,7 @@ public class ReplicationControlManagerTest {
setBrokerId(brokerId).setBrokerEpoch(brokerId +
100).setCurrentMetadataOffset(1).
setWantFence(false).setWantShutDown(false), 0);
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
result.response());
- ControllerTestUtils.replayAll(ctx.clusterControl, result.records());
+ ctx.replay(result.records());
}
@Test
@@ -157,7 +179,7 @@ public class ReplicationControlManagerTest {
setErrorMessage(null).setErrorCode((short) 0).
setTopicId(result2.response().topics().find("foo").topicId()));
assertEquals(expectedResponse2, result2.response());
- ControllerTestUtils.replayAll(replicationControl, result2.records());
+ ctx.replay(result2.records());
assertEquals(new PartitionControlInfo(new int[] {2, 0, 1},
new int[] {2, 0, 1}, null, null, 2, 0, 0),
replicationControl.getPartition(
@@ -197,29 +219,6 @@ public class ReplicationControlManagerTest {
assertEquals(expectedTopicErrors, topicErrors);
}
- private static CreatableTopicResult createTestTopic(
- ReplicationControlManager replicationControl, String name,
- int[][] replicas) throws Exception {
- assertFalse(replicas.length == 0);
- CreateTopicsRequestData request = new CreateTopicsRequestData();
- CreatableTopic topic = new CreatableTopic().setName(name);
- topic.setNumPartitions(-1).setReplicationFactor((short) -1);
- for (int i = 0; i < replicas.length; i++) {
- topic.assignments().add(new CreatableReplicaAssignment().
-
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
- }
- request.topics().add(topic);
- ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(request);
- CreatableTopicResult topicResult =
result.response().topics().find(name);
- assertNotNull(topicResult);
- assertEquals((short) 0, topicResult.errorCode());
- assertEquals(replicas.length, topicResult.numPartitions());
- assertEquals(replicas[0].length, topicResult.replicationFactor());
- ControllerTestUtils.replayAll(replicationControl, result.records());
- return topicResult;
- }
-
@Test
public void testRemoveLeaderships() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
@@ -228,7 +227,7 @@ public class ReplicationControlManagerTest {
registerBroker(i, ctx);
unfenceBroker(i, ctx);
}
- CreatableTopicResult result = createTestTopic(replicationControl,
"foo",
+ CreatableTopicResult result = ctx.createTestTopic("foo",
new int[][] {
new int[] {0, 1, 2},
new int[] {1, 2, 3},
@@ -241,8 +240,8 @@ public class ReplicationControlManagerTest {
assertEquals(expectedPartitions, ControllerTestUtils.
iteratorToSet(replicationControl.brokersToIsrs().iterator(0,
true)));
List<ApiMessageAndVersion> records = new ArrayList<>();
- replicationControl.handleNodeDeactivated(0, records);
- ControllerTestUtils.replayAll(replicationControl, records);
+ replicationControl.handleBrokerFenced(0, records);
+ ctx.replay(records);
assertEquals(Collections.emptySet(), ControllerTestUtils.
iteratorToSet(replicationControl.brokersToIsrs().iterator(0,
true)));
}
@@ -255,7 +254,7 @@ public class ReplicationControlManagerTest {
registerBroker(i, ctx);
unfenceBroker(i, ctx);
}
- CreatableTopicResult createTopicResult =
createTestTopic(replicationControl, "foo",
+ CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new
TopicIdPartition(createTopicResult.topicId(), 0);
@@ -287,7 +286,7 @@ public class ReplicationControlManagerTest {
registerBroker(i, ctx);
unfenceBroker(i, ctx);
}
- CreatableTopicResult createTopicResult =
createTestTopic(replicationControl, "foo",
+ CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new
TopicIdPartition(createTopicResult.topicId(), 0);
@@ -652,4 +651,42 @@ public class ReplicationControlManagerTest {
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2),
OptionalInt.of(3))).getMessage());
}
+
+ @Test
+ public void testElectionWasClean() {
+ assertTrue(ReplicationControlManager.electionWasClean(1, new int[] {1,
2}));
+ assertFalse(ReplicationControlManager.electionWasClean(1, new int[]
{0, 2}));
+ assertFalse(ReplicationControlManager.electionWasClean(1, new int[]
{}));
+ assertTrue(ReplicationControlManager.electionWasClean(3, new int[] {1,
2, 3, 4, 5, 6}));
+ }
+
+ @Test
+ public void testPartitionControlInfoMergeAndDiff() {
+ PartitionControlInfo a = new PartitionControlInfo(
+ new int[]{1, 2, 3}, new int[]{1, 2}, null, null, 1, 0, 0);
+ PartitionControlInfo b = new PartitionControlInfo(
+ new int[]{1, 2, 3}, new int[]{3}, null, null, 3, 1, 1);
+ PartitionControlInfo c = new PartitionControlInfo(
+ new int[]{1, 2, 3}, new int[]{1}, null, null, 1, 0, 1);
+ assertEquals(b, a.merge(new PartitionChangeRecord().
+ setLeader(3).setIsr(Arrays.asList(3))));
+ assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1,
partitionEpoch: 0 -> 1",
+ b.diff(a));
+ assertEquals("isr: [1, 2] -> [1], partitionEpoch: 0 -> 1",
+ c.diff(a));
+ }
+
+ @Test
+ public void testBestLeader() {
+ assertEquals(2, ReplicationControlManager.bestLeader(
+ new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
+ assertEquals(3, ReplicationControlManager.bestLeader(
+ new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
+ assertEquals(4, ReplicationControlManager.bestLeader(
+ new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
+ assertEquals(-1, ReplicationControlManager.bestLeader(
+ new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
+ assertEquals(4, ReplicationControlManager.bestLeader(
+ new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
+ }
}