This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new 77137e9 KAFKA-3978; Ensure high watermark is always positive (#4695) 77137e9 is described below commit 77137e993b15ad4272972f05f61e7faee36a1914 Author: Dong Lin <lindon...@users.noreply.github.com> AuthorDate: Tue Mar 13 22:52:59 2018 -0700 KAFKA-3978; Ensure high watermark is always positive (#4695) Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch. Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> --- core/src/main/scala/kafka/cluster/Partition.scala | 6 ++++- core/src/main/scala/kafka/cluster/Replica.scala | 14 ++++++++++-- core/src/main/scala/kafka/log/Log.scala | 8 +++---- .../admin/ReassignPartitionsClusterTest.scala | 26 ++++++++++++++++++++++ 4 files changed, 47 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b97671..68faf00 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -460,7 +460,11 @@ class Partition(val topic: String, }.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark - if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { + + // Ensure that the high watermark increases monotonically. We also update the high watermark when the new + // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. + if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || + (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { leaderReplica.highWatermark = newHighWatermark debug(s"High watermark updated to $newHighWatermark") true diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e41e389..030e5b7 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -138,6 +138,9 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { + if (newHighWatermark.messageOffset < 0) + throw new IllegalArgumentException("High watermark offset should be non-negative") + highWatermarkMetadata = newHighWatermark log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset)) trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]") @@ -165,9 +168,16 @@ class Replica(val brokerId: Int, s"non-local replica $brokerId")) } - def convertHWToLocalOffsetMetadata() = { + /* + * Convert hw to local offset metadata by reading the log at the hw offset. + * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata. + */ + def convertHWToLocalOffsetMetadata() { if (isLocal) { - highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) + highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse { + val firstOffset = log.get.logSegments.head.baseOffset + new LogOffsetMetadata(firstOffset, firstOffset, 0) + } } else { throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId") } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 257dd8f..f0050f5 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, return unknown offset metadata + * If the message offset is out of range, return None to the caller. */ - def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { + def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { try { val fetchDataInfo = readUncommitted(offset, 1) - fetchDataInfo.fetchOffsetMetadata + Some(fetchDataInfo.fetchOffsetMetadata) } catch { - case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata + case _: OffsetOutOfRangeException => None } } diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 2a24a37..0c41519 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -77,6 +77,32 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { } @Test + def testHwAfterPartitionReassignment(): Unit = { + //Given a single replica on server 100 + startBrokers(Seq(100, 101, 102)) + adminClient = createAdminClient(servers) + createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers) + + val topicPartition = new TopicPartition(topicName, 0) + val leaderServer = servers.find(_.config.brokerId == 100).get + leaderServer.replicaManager.logManager.truncateFullyAndStartAt(topicPartition, 100L, false) + + val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101, 102]}]}""" + ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle) + + val newLeaderServer = servers.find(_.config.brokerId == 101).get + + TestUtils.waitUntilTrue ( + () => newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined, + "broker 101 should be the new leader", pause = 1L + ) + + assertEquals(100, newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset) + servers.foreach(server => waitUntilTrue(() => server.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset == 100, "")) + } + + + @Test def shouldMoveSinglePartition(): Unit = { //Given a single replica on server 100 startBrokers(Seq(100, 101)) -- To stop receiving notification emails like this one, please contact j...@apache.org.