This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aeeb7b2 KAFKA-9263 The new hw is added to incorrect log when
ReplicaAlterLogDirsThread is replacing log (fix
PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) (#9423)
aeeb7b2 is described below
commit aeeb7b2f9a9abe8f49543a2278757722e5974cb3
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Wed Dec 2 11:21:28 2020 +0800
KAFKA-9263 The new hw is added to incorrect log when
ReplicaAlterLogDirsThread is replacing log (fix
PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) (#9423)
Reviewers: Jun Rao <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 62 ++++++++++++-----------
1 file changed, 32 insertions(+), 30 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index f6a9e83..a16a4c1 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -704,7 +704,11 @@ class Partition(val topicPartition: TopicPartition,
// check if the HW of the partition can now be incremented
// since the replica may already be in the ISR and its LEO has just
incremented
val leaderHWIncremented = if (prevFollowerEndOffset !=
followerReplica.logEndOffset) {
- leaderLogIfLocal.exists(leaderLog =>
maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
+ // the leader log may be updated by ReplicaAlterLogDirsThread so the
following method must be in lock of
+ // leaderIsrUpdateLock to prevent adding new hw to invalid log.
+ inReadLock(leaderIsrUpdateLock) {
+ leaderLogIfLocal.exists(leaderLog =>
maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
+ }
} else {
false
}
@@ -865,41 +869,39 @@ class Partition(val topicPartition: TopicPartition,
* committed ISR. However, adding additional replicas to the ISR makes it
more restrictive and therefor safe. We call
* this set the "maximal" ISR. See KIP-497 for more details
*
- * Returns true if the HW was incremented, and false otherwise.
- * Note There is no need to acquire the leaderIsrUpdate lock here
- * since all callers of this private API acquire that lock
+ * Note There is no need to acquire the leaderIsrUpdate lock here since all
callers of this private API acquire that lock
+ *
+ * @return true if the HW was incremented, and false otherwise.
*/
private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long =
time.milliseconds): Boolean = {
- inReadLock(leaderIsrUpdateLock) {
- // maybeIncrementLeaderHW is in the hot path, the following code is
written to
- // avoid unnecessary collection generation
- var newHighWatermark = leaderLog.logEndOffsetMetadata
- remoteReplicasMap.values.foreach { replica =>
- // Note here we are using the "maximal", see explanation above
- if (replica.logEndOffsetMetadata.messageOffset <
newHighWatermark.messageOffset &&
- (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs ||
isrState.maximalIsr.contains(replica.brokerId))) {
- newHighWatermark = replica.logEndOffsetMetadata
- }
+ // maybeIncrementLeaderHW is in the hot path, the following code is
written to
+ // avoid unnecessary collection generation
+ var newHighWatermark = leaderLog.logEndOffsetMetadata
+ remoteReplicasMap.values.foreach { replica =>
+ // Note here we are using the "maximal", see explanation above
+ if (replica.logEndOffsetMetadata.messageOffset <
newHighWatermark.messageOffset &&
+ (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs ||
isrState.maximalIsr.contains(replica.brokerId))) {
+ newHighWatermark = replica.logEndOffsetMetadata
}
+ }
- leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
- case Some(oldHighWatermark) =>
- debug(s"High watermark updated from $oldHighWatermark to
$newHighWatermark")
- true
+ leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
+ case Some(oldHighWatermark) =>
+ debug(s"High watermark updated from $oldHighWatermark to
$newHighWatermark")
+ true
- case None =>
- def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
- case (brokerId, logEndOffsetMetadata) => s"replica $brokerId:
$logEndOffsetMetadata"
- }
+ case None =>
+ def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
+ case (brokerId, logEndOffsetMetadata) => s"replica $brokerId:
$logEndOffsetMetadata"
+ }
- if (isTraceEnabled) {
- val replicaInfo = remoteReplicas.map(replica => (replica.brokerId,
replica.logEndOffsetMetadata)).toSet
- val localLogInfo = (localBrokerId,
localLogOrException.logEndOffsetMetadata)
- trace(s"Skipping update high watermark since new hw
$newHighWatermark is not larger than old value. " +
- s"All current LEOs are ${(replicaInfo +
localLogInfo).map(logEndOffsetString)}")
- }
- false
- }
+ if (isTraceEnabled) {
+ val replicaInfo = remoteReplicas.map(replica => (replica.brokerId,
replica.logEndOffsetMetadata)).toSet
+ val localLogInfo = (localBrokerId,
localLogOrException.logEndOffsetMetadata)
+ trace(s"Skipping update high watermark since new hw
$newHighWatermark is not larger than old value. " +
+ s"All current LEOs are ${(replicaInfo +
localLogInfo).map(logEndOffsetString)}")
+ }
+ false
}
}