[ https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398124#comment-16398124 ]
ASF GitHub Bot commented on KAFKA-3978: --------------------------------------- hachikuji closed pull request #4695: KAFKA-3978; highwatermark should always be positive URL: https://github.com/apache/kafka/pull/4695 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b97671524d..68faf00c079 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -460,7 +460,11 @@ class Partition(val topic: String, }.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark - if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { + + // Ensure that the high watermark increases monotonically. We also update the high watermark when the new + // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. + if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || + (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { leaderReplica.highWatermark = newHighWatermark debug(s"High watermark updated to $newHighWatermark") true diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e41e389e22d..030e5b7eb58 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -138,6 +138,9 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { + if (newHighWatermark.messageOffset < 0) + throw new IllegalArgumentException("High watermark offset should be non-negative") + highWatermarkMetadata = newHighWatermark log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset)) trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]") @@ -165,9 +168,16 @@ class Replica(val brokerId: Int, s"non-local replica $brokerId")) } - def convertHWToLocalOffsetMetadata() = { + /* + * Convert hw to local offset metadata by reading the log at the hw offset. + * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata. + */ + def convertHWToLocalOffsetMetadata() { if (isLocal) { - highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) + highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse { + val firstOffset = log.get.logSegments.head.baseOffset + new LogOffsetMetadata(firstOffset, firstOffset, 0) + } } else { throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId") } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 257dd8f9ba4..f0050f54aef 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, return unknown offset metadata + * If the message offset is out of range, return None to the caller. */ - def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { + def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { try { val fetchDataInfo = readUncommitted(offset, 1) - fetchDataInfo.fetchOffsetMetadata + Some(fetchDataInfo.fetchOffsetMetadata) } catch { - case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata + case _: OffsetOutOfRangeException => None } } diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 2a24a37f151..0c41519d211 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -76,6 +76,32 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { super.tearDown() } + @Test + def testHwAfterPartitionReassignment(): Unit = { + //Given a single replica on server 100 + startBrokers(Seq(100, 101, 102)) + adminClient = createAdminClient(servers) + createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers) + + val topicPartition = new TopicPartition(topicName, 0) + val leaderServer = servers.find(_.config.brokerId == 100).get + leaderServer.replicaManager.logManager.truncateFullyAndStartAt(topicPartition, 100L, false) + + val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101, 102]}]}""" + ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle) + + val newLeaderServer = servers.find(_.config.brokerId == 101).get + + TestUtils.waitUntilTrue ( + () => newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined, + "broker 101 should be the new leader", pause = 1L + ) + + assertEquals(100, newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset) + servers.foreach(server => waitUntilTrue(() => server.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset == 100, "")) + } + + @Test def shouldMoveSinglePartition(): Unit = { //Given a single replica on server 100 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cannot truncate to a negative offset (-1) exception at broker startup > --------------------------------------------------------------------- > > Key: KAFKA-3978 > URL: https://issues.apache.org/jira/browse/KAFKA-3978 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.0.0 > Environment: 3.13.0-87-generic > Reporter: Juho Mäkinen > Assignee: Dong Lin > Priority: Critical > Labels: reliability, startup > > During broker startup sequence the broker server.log has this exception. > Problem persists after multiple restarts and also on another broker in the > cluster. > {code} > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser) > INFO Kafka commitId : b8642491e78c5a13 > (org.apache.kafka.common.utils.AppInfoParser) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > Error when handling request > {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT > SNIPPED AWAY..], > live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]} > (kafka.server.KafkaApis) > ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative > offset (-1). > at kafka.log.Log.truncateTo(Log.scala:731) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at kafka.log.LogManager.truncateTo(LogManager.scala:280) > at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:802) > at > kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:648) > at > kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:144) > at kafka.server.KafkaApis.handle(KafkaApis.scala:80) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)