Repository: kafka
Updated Branches:
  refs/heads/trunk 9beafae23 -> 4d467c2ec


KAFKA-725: Return OffsetOutOfRange error from ReplicaManager when non-follower 
attempts reading an offset that's above high watermark.

This should make Log.read act the same when startOffset is larger than 
maxOffset as it would if startOffset was larger than logEndOffset. The current 
behavior can result in an IllegalArgumentException from LogSegment if a 
consumer attempts to fetch an offset above the high watermark which is present 
in the leader's log. It seems more correct if Log.read presents the view of the 
log to consumers as if it simply ended at maxOffset (high watermark).

I've tried to describe an example scenario of this happening here 
https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673

I'm not sure I understand why ReplicaManager sets maxOffset to the high 
watermark, and not high watermark + 1. Isn't the high watermark the last 
committed message, and readable by consumers?

Tests passed for me locally on second try, seems like it just hit a flaky test.

Author: Stig Rohde Døssing <[email protected]>

Reviewers: Jiangjie Qin <[email protected]>, Guozhang Wang 
<[email protected]>

Closes #1178 from srdo/KAFKA-725


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d467c2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d467c2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d467c2e

Branch: refs/heads/trunk
Commit: 4d467c2ec275b5659c2da0ca196409dffaa3caf3
Parents: 9beafae
Author: Stig Rohde Døssing <[email protected]>
Authored: Fri Apr 8 09:44:51 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Fri Apr 8 09:44:51 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     |   8 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   9 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 118 +++++++++++++++----
 3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4d467c2e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f050e27..22657f4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -522,8 +522,12 @@ class ReplicaManager(val config: KafkaConfig,
             getReplicaOrException(topic, partition)
 
           // decide whether to only fetch committed data (i.e. messages below 
high watermark)
-          val maxOffsetOpt = if (readOnlyCommitted)
-            Some(localReplica.highWatermark.messageOffset)
+          val maxOffsetOpt = if (readOnlyCommitted) {
+            val maxOffset = localReplica.highWatermark.messageOffset
+            if(offset > maxOffset)
+              throw new OffsetOutOfRangeException("Request for offset %d 
beyond high watermark %d when reading from only committed 
offsets".format(offset, maxOffset))
+            Some(maxOffset)
+          }
           else
             None
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d467c2e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4d75d53..3f6a275 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -228,6 +228,7 @@ class LogTest extends JUnitSuite {
   /**
    * Test reading at the boundary of the log, specifically
    * - reading from the logEndOffset should give an empty message set
+   * - reading from the the maxOffset should give an empty message set
    * - reading beyond the log end offset should throw an 
OffsetOutOfRangeException
    */
   @Test
@@ -236,19 +237,21 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, 
time.scheduler, time = time)
-    assertEquals("Reading just beyond end of log should produce 0 byte read.", 
0, log.read(1024, 1000).messageSet.sizeInBytes)
+    log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new 
Message("42".getBytes)))
+    assertEquals("Reading just beyond end of log should produce 0 byte read.", 
0, log.read(1025, 1000).messageSet.sizeInBytes)
     try {
-      log.read(0, 1024)
+      log.read(0, 1025)
       fail("Expected exception on invalid read.")
     } catch {
       case e: OffsetOutOfRangeException => "This is good."
     }
     try {
-      log.read(1025, 1000)
+      log.read(1026, 1000)
       fail("Expected exception on invalid read.")
     } catch {
       case e: OffsetOutOfRangeException => // This is good.
     }
+    assertEquals("Reading from maxOffset should produce 0 byte read.", 0, 
log.read(1024, 1000, Some(1024)).messageSet.sizeInBytes)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d467c2e/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ee14af4..c2c670e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -35,8 +35,8 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue, assertFalse}
+import org.junit.{Test, Before, After}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
@@ -44,17 +44,28 @@ import scala.collection.Map
 class ReplicaManagerTest {
 
   val topic = "test-topic"
+  val time = new MockTime()
+  val jTime = new JMockTime
+  val metrics = new Metrics
+  var zkClient : ZkClient = _
+  var zkUtils : ZkUtils = _
+    
+  @Before
+  def setUp() {
+    zkClient = EasyMock.createMock(classOf[ZkClient])
+    zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+  }
+  
+  @After
+  def tearDown() {
+    metrics.close();
+  }
 
   @Test
   def testHighWaterMarkDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new 
MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
@@ -64,7 +75,6 @@ class ReplicaManagerTest {
     } finally {
       // shutdown the replica manager upon test completion
       rm.shutdown(false)
-      metrics.close()
     }
   }
 
@@ -73,12 +83,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new 
MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
@@ -88,7 +93,6 @@ class ReplicaManagerTest {
     } finally {
       // shutdown the replica manager upon test completion
       rm.shutdown(checkpointHW = false)
-      metrics.close()
     }
   }
 
@@ -96,12 +100,7 @@ class ReplicaManagerTest {
   def testIllegalRequiredAcks() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new 
MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), Option(this.getClass.getName))
     try {
@@ -116,7 +115,6 @@ class ReplicaManagerTest {
         responseCallback = callback)
     } finally {
       rm.shutdown(checkpointHW = false)
-      metrics.close()
     }
 
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
@@ -127,12 +125,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new 
MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
 
@@ -192,7 +185,80 @@ class ReplicaManagerTest {
       assertTrue(fetchCallbackFired)
     } finally {
       rm.shutdown(checkpointHW = false)
-      metrics.close()
+    }
+  }
+  
+  @Test
+  def testFetchBeyondHighWatermarkNotAllowedForConsumer() {
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props.put("broker.id", Int.box(0))
+    val config = KafkaConfig.fromProps(props)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray)
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new 
MockScheduler(time), mockLogMgr,
+      new AtomicBoolean(false), Option(this.getClass.getName))
+    try {
+      val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 
1), new Broker(1, "host2", 2))
+      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+      
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+      EasyMock.replay(metadataCache)
+      
+      val brokerList : java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
+      val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
+      
+      val partition = rm.getOrCreatePartition(topic, 0)
+      partition.getOrCreateReplica(0)
+      
+      // Make this replica the leader.
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, 
"host2", 2)).asJava)
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) 
=> {})
+      rm.getLeaderReplicaIfLocal(topic, 0)
+
+      def produceCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]) = {}
+      
+      // Append a message.
+      for(i <- 1 to 2)
+        rm.appendMessages(
+          timeout = 1000,
+          requiredAcks = -1,
+          internalTopicsAllowed = false,
+          messagesPerPartition = Map(new TopicPartition(topic, 0) -> new 
ByteBufferMessageSet(new Message("message %d".format(i).getBytes))),
+          responseCallback = produceCallback)
+      
+      var fetchCallbackFired = false
+      var fetchError = 0
+      def fetchCallback(responseStatus: Map[TopicAndPartition, 
FetchResponsePartitionData]) = {
+        fetchError = responseStatus.values.head.error
+        fetchCallbackFired = true
+      }
+      
+      // Fetch a message above the high watermark as a follower
+      rm.fetchMessages(
+        timeout = 1000,
+        replicaId = 1,
+        fetchMinBytes = 1,
+        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) 
-> new PartitionFetchInfo(1, 100000)),
+        responseCallback = fetchCallback)
+        
+      
+      assertTrue(fetchCallbackFired)
+      assertEquals("Should not give an exception", Errors.NONE.code, 
fetchError)
+      fetchCallbackFired = false
+      
+      // Fetch a message above the high watermark as a consumer
+      rm.fetchMessages(
+        timeout = 1000,
+        replicaId = -1,
+        fetchMinBytes = 1,
+        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) 
-> new PartitionFetchInfo(1, 100000)),
+        responseCallback = fetchCallback)
+          
+        assertTrue(fetchCallbackFired)
+        assertEquals("Should give OffsetOutOfRangeException", 
Errors.OFFSET_OUT_OF_RANGE.code, fetchError)
+    } finally {
+      rm.shutdown(checkpointHW = false)
     }
   }
 }

Reply via email to