junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689659905
##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##########
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
public Set<Uuid> deletedTopicIds() {
return deletedTopicIds;
}
+
+ /**
+ * Find the topic partitions that have change base on the replica given.
Review comment:
base => based
##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##########
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
public Set<Uuid> deletedTopicIds() {
return deletedTopicIds;
}
+
+ /**
+ * Find the topic partitions that have change base on the replica given.
+ *
+ * The changes identified are:
+ * 1. topic partitions for which the broker is not a replica anymore
+ * 2. topic partitions for which the broker is now the leader
+ * 3. topic partitions for which the broker is now a follower
+ *
+ * @param brokerId the broker id
+ * @return the list of topic partitions which the broker should remove,
become leader or become follower.
+ */
+ public LocalReplicaChanges localChanges(int brokerId) {
+ Set<TopicPartition> deletes = new HashSet<>();
+ Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new
HashMap<>();
+ Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new
HashMap<>();
+
+ for (TopicDelta delta : changedTopics.values()) {
+ LocalReplicaChanges changes = delta.localChanges(brokerId);
+
+ deletes.addAll(changes.deletes());
+ leaders.putAll(changes.leaders());
+ followers.putAll(changes.followers());
+ }
+
+ // Add all of the deleted topic partitions to the map of locally
removed partitions
+ deletedTopicIds().forEach(topicId -> {
+ TopicImage topicImage = image().getTopic(topicId);
+ topicImage.partitions().keySet().forEach(partitionId -> {
+ deletes.add(new TopicPartition(topicImage.name(),
partitionId));
Review comment:
Should we further check that the deleted partition has a replica on
brokerId before adding it to deletes?
##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3020,34 +2949,183 @@ class ReplicaManagerTest {
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
+ @Test
+ def testDeltaFollowerToNotReplica(): Unit = {
+ val localId = 1
+ val otherId = localId + 1
+ val topicPartition = new TopicPartition("foo", 0)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), localId)
+
+ try {
+ // Make the local replica the follower
+ val followerTopicsDelta = topicsCreateDelta(localId, false)
+ val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+ // Check the state of that partition and fetcher
+ val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ assertFalse(followerPartition.isLeader)
+ assertEquals(0, followerPartition.getLeaderEpoch)
+
+ val fetcher =
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+ assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)),
fetcher.map(_.sourceBroker))
+
+ // Apply changes that remove replica
+ val notReplicaTopicsDelta =
topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
+ val notReplicaMetadataImage =
imageFromTopics(notReplicaTopicsDelta.apply())
+ replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+ // Check that the partition was removed
+ assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
+ assertEquals(None,
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+ assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+ } finally {
+ replicaManager.shutdown()
+ }
+
+ TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+ }
+
+ @Test
+ def testDeltaFollowerRemovedTopic(): Unit = {
+ val localId = 1
+ val otherId = localId + 1
+ val topicPartition = new TopicPartition("foo", 0)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), localId)
+
+ try {
+ // Make the local replica the follower
+ val followerTopicsDelta = topicsCreateDelta(localId, false)
+ val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+ // Check the state of that partition and fetcher
+ val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ assertFalse(followerPartition.isLeader)
+ assertEquals(0, followerPartition.getLeaderEpoch)
+
+ val fetcher =
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+ assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)),
fetcher.map(_.sourceBroker))
+
+ // Apply changes that remove topic and replica
+ val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
+ val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
+ replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
+
+ // Check that the partition was removed
+ assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
+ assertEquals(None,
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+ assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+ } finally {
+ replicaManager.shutdown()
+ }
+
+ TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+ }
+
+ @Test
+ def testDeltaLeaderToNotReplica(): Unit = {
+ val localId = 1
+ val otherId = localId + 1
+ val topicPartition = new TopicPartition("foo", 0)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), localId)
+
+ try {
+ // Make the local replica the follower
+ val leaderTopicsDelta = topicsCreateDelta(localId, true)
+ val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+ replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+
+ // Check the state of that partition and fetcher
+ val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ assertTrue(leaderPartition.isLeader)
+ assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+ assertEquals(0, leaderPartition.getLeaderEpoch)
+
+ assertEquals(None,
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+ // Apply changes that remove replica
+ val notReplicaTopicsDelta =
topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
+ val notReplicaMetadataImage =
imageFromTopics(notReplicaTopicsDelta.apply())
+ replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+ // Check that the partition was removed
Review comment:
In RaftClusterTest.testCreateClusterAndPerformReassignment(), should we
add the same check to verify that the old replicas are removed after
reassignment?
##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
##########
@@ -93,43 +93,49 @@ public TopicImage apply() {
}
/**
- * Find the partitions that we are now leading, whose partition epoch has
changed.
+ * Find the partitions that have change base on the replica given.
Review comment:
base => based
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]