[ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398124#comment-16398124
 ] 

ASF GitHub Bot commented on KAFKA-3978:
---------------------------------------

hachikuji closed pull request #4695: KAFKA-3978; highwatermark should always be 
positive
URL: https://github.com/apache/kafka/pull/4695
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b97671524d..68faf00c079 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -460,7 +460,11 @@ class Partition(val topic: String,
     }.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new 
LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
-    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || 
oldHighWatermark.onOlderSegment(newHighWatermark)) {
+
+    // Ensure that the high watermark increases monotonically. We also update 
the high watermark when the new
+    // offset metadata is on a newer segment, which occurs whenever the log is 
rolled to a new segment.
+    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && 
oldHighWatermark.onOlderSegment(newHighWatermark))) {
       leaderReplica.highWatermark = newHighWatermark
       debug(s"High watermark updated to $newHighWatermark")
       true
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index e41e389e22d..030e5b7eb58 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -138,6 +138,9 @@ class Replica(val brokerId: Int,
 
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
+      if (newHighWatermark.messageOffset < 0)
+        throw new IllegalArgumentException("High watermark offset should be 
non-negative")
+
       highWatermarkMetadata = newHighWatermark
       log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
       trace(s"Setting high watermark for replica $brokerId partition 
$topicPartition to [$newHighWatermark]")
@@ -165,9 +168,16 @@ class Replica(val brokerId: Int,
       s"non-local replica $brokerId"))
   }
 
-  def convertHWToLocalOffsetMetadata() = {
+  /*
+   * Convert hw to local offset metadata by reading the log at the hw offset.
+   * If the hw offset is out of range, return the first offset of the first 
log segment as the offset metadata.
+   */
+  def convertHWToLocalOffsetMetadata() {
     if (isLocal) {
-      highWatermarkMetadata = 
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+      highWatermarkMetadata = 
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {
+        val firstOffset = log.get.logSegments.head.baseOffset
+        new LogOffsetMetadata(firstOffset, firstOffset, 0)
+      }
     } else {
       throw new KafkaException(s"Should not construct complete high watermark 
on partition $topicPartition's non-local replica $brokerId")
     }
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 257dd8f9ba4..f0050f54aef 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File,
 
   /**
    * Given a message offset, find its corresponding offset metadata in the log.
-   * If the message offset is out of range, return unknown offset metadata
+   * If the message offset is out of range, return None to the caller.
    */
-  def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+  def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
     try {
       val fetchDataInfo = readUncommitted(offset, 1)
-      fetchDataInfo.fetchOffsetMetadata
+      Some(fetchDataInfo.fetchOffsetMetadata)
     } catch {
-      case _: OffsetOutOfRangeException => 
LogOffsetMetadata.UnknownOffsetMetadata
+      case _: OffsetOutOfRangeException => None
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 2a24a37f151..0c41519d211 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -76,6 +76,32 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
     super.tearDown()
   }
 
+  @Test
+  def testHwAfterPartitionReassignment(): Unit = {
+    //Given a single replica on server 100
+    startBrokers(Seq(100, 101, 102))
+    adminClient = createAdminClient(servers)
+    createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    val topicPartition = new TopicPartition(topicName, 0)
+    val leaderServer = servers.find(_.config.brokerId == 100).get
+    
leaderServer.replicaManager.logManager.truncateFullyAndStartAt(topicPartition, 
100L, false)
+
+    val topicJson: String = 
s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101,
 102]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), 
topicJson, NoThrottle)
+
+    val newLeaderServer = servers.find(_.config.brokerId == 101).get
+
+    TestUtils.waitUntilTrue (
+      () => 
newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+      "broker 101 should be the new leader", pause = 1L
+    )
+
+    assertEquals(100, 
newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset)
+    servers.foreach(server => waitUntilTrue(() => 
server.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset
 == 100, ""))
+  }
+
+
   @Test
   def shouldMoveSinglePartition(): Unit = {
     //Given a single replica on server 100


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cannot truncate to a negative offset (-1) exception at broker startup
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-3978
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3978
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.0.0
>         Environment: 3.13.0-87-generic 
>            Reporter: Juho Mäkinen
>            Assignee: Dong Lin
>            Priority: Critical
>              Labels: reliability, startup
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> INFO Kafka commitId : b8642491e78c5a13 
> (org.apache.kafka.common.utils.AppInfoParser)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> Error when handling request 
> {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT 
> SNIPPED AWAY..], 
> live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]}
>  (kafka.server.KafkaApis)
> ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative 
> offset (-1).
>         at kafka.log.Log.truncateTo(Log.scala:731)
>         at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288)
>         at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280)
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>         at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>         at kafka.log.LogManager.truncateTo(LogManager.scala:280)
>         at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:802)
>         at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:648)
>         at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:144)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:80)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to