[GitHub] [kafka] showuon commented on a diff in pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently

2023-07-23 Thread via GitHub


showuon commented on code in PR #14051:
URL: https://github.com/apache/kafka/pull/14051#discussion_r1271666143


##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -1290,6 +1290,56 @@ class PartitionTest extends AbstractPartitionTest {
 )
   }
 
+  @Test
+  def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = {
+val mockMetadataCache = mock(classOf[KRaftMetadataCache])
+val partition = spy(new Partition(topicPartition,
+  replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+  interBrokerProtocolVersion = interBrokerProtocolVersion,
+  localBrokerId = brokerId,
+  () => defaultBrokerEpoch(brokerId),
+  time,
+  alterPartitionListener,
+  delayedOperations,
+  mockMetadataCache,
+  logManager,
+  alterPartitionManager))
+
+when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, 
ArgumentMatchers.eq(topicPartition)))
+  .thenReturn(None)
+val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+seedLogData(log, numRecords = 6, leaderEpoch = 4)
+
+val controllerEpoch = 0
+val leaderEpoch = 5
+val remoteBrokerId = brokerId + 1
+val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+
+partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+
+val initializeTimeMs = time.milliseconds()
+assertTrue(partition.makeLeader(
+  new LeaderAndIsrPartitionState()
+.setControllerEpoch(controllerEpoch)
+.setLeader(brokerId)
+.setLeaderEpoch(leaderEpoch)
+.setIsr(List[Integer](brokerId).asJava)
+.setPartitionEpoch(1)
+.setReplicas(replicas)
+.setIsNew(true),
+  offsetCheckpoints, None), "Expected become leader transition to succeed")
+
+doAnswer(_ => {
+  // simulate topic is deleted at the moment
+  partition.delete()
+  val replica = new Replica(remoteBrokerId, topicPartition)
+  partition.updateFollowerFetchState(replica, 
mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, 0)
+  mock(classOf[LogReadInfo])
+}).when(partition).fetchRecords(any(), any(), anyLong(), anyInt(), 
anyBoolean(), anyBoolean())
+
+fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)

Review Comment:
   nit: `assertDoesNotThrow(() => fetchFollower...)` should make it much clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently

2023-07-19 Thread via GitHub


showuon commented on code in PR #14051:
URL: https://github.com/apache/kafka/pull/14051#discussion_r1268862349


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -998,7 +998,13 @@ class Partition(val topicPartition: TopicPartition,
   // 3. Its metadata cached broker epoch matches its Fetch request broker 
epoch. Or the Fetch
   //request broker epoch is -1 which bypasses the epoch verification.
   case kRaftMetadataCache: KRaftMetadataCache =>
-val storedBrokerEpoch = 
remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch
+val mayBeReplica = getReplica(followerReplicaId)
+// The topic is already deleted and we don't have any replica 
information. In this case, we can return false
+// so as to avoid NPE
+if (mayBeReplica.isEmpty) {
+  return false

Review Comment:
   We should log something here. maybe:
   `warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist 
in the leader node. It might because the topic is already deleted.")`
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org