junrao commented on code in PR #17796:
URL: https://github.com/apache/kafka/pull/17796#discussion_r1868150531
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -6798,6 +6798,105 @@ class ReplicaManagerTest {
assertEquals(Double.NaN, maxMetric.metricValue)
}
+ @Test
+ def testBecomeFollowerInvokeOnBecomingFollowerListener(): Unit = {
+ val localId = 1
+ val topicPartition = new TopicPartition("foo", 0)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), localId)
+ // Attach listener to partition.
+ val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+ replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew
= false, isFutureReplica = false, offsetCheckpoints, None)
+ val listener = new MockPartitionListener
+ assertTrue(replicaManager.maybeAddListener(topicPartition, listener))
+ listener.verify()
+
+ try {
+ // Make the local replica the leader
+ val leaderTopicsDelta = topicsCreateDelta(localId, true)
+ val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
+
+ // Check the state of that partition and fetcher
+ val HostedPartition.Online(leaderPartition) =
replicaManager.getPartition(topicPartition)
+ assertTrue(leaderPartition.isLeader)
+ assertEquals(0, leaderPartition.getLeaderEpoch)
+ // On becoming follower listener should not be invoked yet.
+ listener.verify()
+
+ // Change the local replica to follower
+ val followerTopicsDelta =
topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
+ val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+ // On becoming follower listener should be invoked.
+ listener.verify(expectedFollower = true)
+
+ // Check the state of that partition.
+ val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ assertFalse(followerPartition.isLeader)
+ assertEquals(1, followerPartition.getLeaderEpoch)
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
+ @Test
+ def testBecomeFollowerNotInvokeOnBecomingFollowerListenerOnZk(): Unit = {
Review Comment:
No need to add this test since `replicaManager.becomeLeaderOrFollower` will
be eventually removed.
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2496,6 +2498,98 @@ public void
testReplicaManagerFetchMultipleSharePartitionsException() {
assertTrue(partitionCacheMap.isEmpty());
}
+ @Test
+ public void testListenerRegistration() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("bar", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+ Partition partition = mockPartition();
+
when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition);
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ // Validate that the listener is registered.
+ verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
+ }
+
+ @Test
+ public void testSharePartitionListenerOnFailed() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onFailed);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnDeleted() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onDeleted);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnBecomingFollower() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onBecomingFollower);
+ }
+
+ private void testSharePartitionListener(
+ SharePartitionKey sharePartitionKey,
+ Map<SharePartitionKey, SharePartition> partitionCacheMap,
+ ReplicaManager mockReplicaManager,
+ Consumer<TopicPartition> listenerConsumer
+ ) {
+ // Add another share partition to the cache.
+ TopicPartition tp = new TopicPartition("foo", 1);
+ TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
+ SharePartitionKey spk = new SharePartitionKey("grp", tpId);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+ partitionCacheMap.put(sharePartitionKey, sp0);
+ partitionCacheMap.put(spk, sp1);
+
+ // Invoke listener for first share partition.
+
listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
+
+ // Validate that the share partition is removed from the cache.
+ assertEquals(1, partitionCacheMap.size());
+ assertFalse(partitionCacheMap.containsKey(sharePartitionKey));
+ verify(sp0, times(1)).markFenced();
+ verify(mockReplicaManager, times(1)).removeListener(any(), any());
+
+ // Invoke listener for second share partition.
+ listenerConsumer.accept(tp);
+ // The second share partition should not be removed as the listener is
attached to single topic partition.
Review Comment:
attached to single topic partition => attached to a different topic
partition ?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -665,17 +670,78 @@ public void handleFencedSharePartitionException(
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
- SharePartition sharePartition =
partitionCacheMap.remove(sharePartitionKey);
- if (sharePartition != null) {
- sharePartition.markFenced();
- }
+ removeSharePartitionFromCache(sharePartitionKey,
partitionCacheMap, replicaManager);
}
}
private SharePartitionKey sharePartitionKey(String groupId,
TopicIdPartition topicIdPartition) {
return new SharePartitionKey(groupId, topicIdPartition);
}
+ private static void removeSharePartitionFromCache(SharePartitionKey
sharePartitionKey,
+ Map<SharePartitionKey, SharePartition> map, ReplicaManager
replicaManager) {
+ SharePartition sharePartition = map.remove(sharePartitionKey);
+ if (sharePartition != null) {
+ sharePartition.markFenced();
+
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(),
sharePartition.listener());
+ }
+ }
+
+ /**
+ * The SharePartitionListener is used to listen for partition events. The
share partition is associated with
+ * the topic-partition, we need to handle the partition events for the
share partition.
+ * <p>
+ * The partition cache map stores share partitions against share partition
key which comprises
+ * group and topic-partition. Instead of maintaining a separate map for
topic-partition to share partitions,
+ * we can maintain the share partition key in the listener and create a
new listener for each share partition.
+ */
+ static class SharePartitionListener implements PartitionListener {
Review Comment:
Would it be better to make this a private class? This way, we don't need to
pass in the states in SharePartitionManager.
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2496,6 +2498,98 @@ public void
testReplicaManagerFetchMultipleSharePartitionsException() {
assertTrue(partitionCacheMap.isEmpty());
}
+ @Test
+ public void testListenerRegistration() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("bar", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+ Partition partition = mockPartition();
+
when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition);
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ // Validate that the listener is registered.
+ verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
+ }
+
+ @Test
+ public void testSharePartitionListenerOnFailed() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onFailed);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnDeleted() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onDeleted);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnBecomingFollower() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onBecomingFollower);
+ }
+
+ private void testSharePartitionListener(
+ SharePartitionKey sharePartitionKey,
+ Map<SharePartitionKey, SharePartition> partitionCacheMap,
+ ReplicaManager mockReplicaManager,
+ Consumer<TopicPartition> listenerConsumer
+ ) {
+ // Add another share partition to the cache.
+ TopicPartition tp = new TopicPartition("foo", 1);
+ TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
+ SharePartitionKey spk = new SharePartitionKey("grp", tpId);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+ partitionCacheMap.put(sharePartitionKey, sp0);
+ partitionCacheMap.put(spk, sp1);
+
+ // Invoke listener for first share partition.
+
listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
+
+ // Validate that the share partition is removed from the cache.
+ assertEquals(1, partitionCacheMap.size());
+ assertFalse(partitionCacheMap.containsKey(sharePartitionKey));
+ verify(sp0, times(1)).markFenced();
+ verify(mockReplicaManager, times(1)).removeListener(any(), any());
+
+ // Invoke listener for second share partition.
Review Comment:
second share partition => non-matching share partition?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -665,17 +670,78 @@ public void handleFencedSharePartitionException(
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
- SharePartition sharePartition =
partitionCacheMap.remove(sharePartitionKey);
- if (sharePartition != null) {
- sharePartition.markFenced();
- }
+ removeSharePartitionFromCache(sharePartitionKey,
partitionCacheMap, replicaManager);
}
}
private SharePartitionKey sharePartitionKey(String groupId,
TopicIdPartition topicIdPartition) {
return new SharePartitionKey(groupId, topicIdPartition);
}
+ private static void removeSharePartitionFromCache(SharePartitionKey
sharePartitionKey,
+ Map<SharePartitionKey, SharePartition> map, ReplicaManager
replicaManager) {
+ SharePartition sharePartition = map.remove(sharePartitionKey);
+ if (sharePartition != null) {
+ sharePartition.markFenced();
+
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(),
sharePartition.listener());
+ }
+ }
+
+ /**
+ * The SharePartitionListener is used to listen for partition events. The
share partition is associated with
+ * the topic-partition, we need to handle the partition events for the
share partition.
+ * <p>
+ * The partition cache map stores share partitions against share partition
key which comprises
+ * group and topic-partition. Instead of maintaining a separate map for
topic-partition to share partitions,
+ * we can maintain the share partition key in the listener and create a
new listener for each share partition.
+ */
+ static class SharePartitionListener implements PartitionListener {
+
+ private final SharePartitionKey sharePartitionKey;
+ private final ReplicaManager replicaManager;
+ private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
+
+ SharePartitionListener(
+ SharePartitionKey sharePartitionKey,
+ ReplicaManager replicaManager,
+ Map<SharePartitionKey, SharePartition> partitionCacheMap
+ ) {
+ this.sharePartitionKey = sharePartitionKey;
+ this.replicaManager = replicaManager;
+ this.partitionCacheMap = partitionCacheMap;
+ }
+
+ @Override
+ public void onFailed(TopicPartition topicPartition) {
+ log.debug("The share partition failed listener is invoked for the
topic-partition: {}, share-partition: {}",
+ topicPartition, sharePartitionKey);
+ onUpdate(topicPartition);
+ }
+
+ @Override
+ public void onDeleted(TopicPartition topicPartition) {
+ log.debug("The share partition delete listener is invoked for the
topic-partition: {}, share-partition: {}",
+ topicPartition, sharePartitionKey);
+ onUpdate(topicPartition);
+ }
+
+ @Override
+ public void onBecomingFollower(TopicPartition topicPartition) {
+ log.debug("The share partition leader change listener is invoked
for the topic-partition: {}, share-partition: {}",
Review Comment:
leader change => becoming follower ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]