Repository: samza
Updated Branches:
  refs/heads/master 2a71baf7c -> 46685406f


SAMZA-1822: Samza 0.14.1 not correctly handling OffsetOutOfRangeException 
exception


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/46685406
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/46685406
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/46685406

Branch: refs/heads/master
Commit: 46685406fb55d5488802e43c435005ffe6511194
Parents: 2a71baf
Author: Gaurav Agarwal <gauravagarw...@gmail.com>
Authored: Mon Aug 27 14:45:05 2018 -0700
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Mon Aug 27 14:49:13 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/system/kafka/GetOffset.scala   |  3 +-
 .../samza/system/kafka/TestGetOffset.scala      | 31 ++++++++++++++++++--
 2 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/46685406/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 040e246..55b4611 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -21,12 +21,11 @@
 
 package org.apache.samza.system.kafka
 
-import kafka.common.OffsetOutOfRangeException
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import kafka.api._
 import kafka.common.TopicAndPartition
 import kafka.api.PartitionOffsetRequestInfo
 import org.apache.samza.util.Logging
-import kafka.message.MessageAndOffset
 import org.apache.samza.util.KafkaUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/46685406/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
index b959348..ab82609 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
@@ -26,15 +26,19 @@ import kafka.common.TopicAndPartition
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.message.ByteBufferMessageSet
-
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.junit._
 import org.junit.Assert._
-import org.mockito.{ Matchers, Mockito }
+import org.mockito.Mockito
 import org.mockito.Mockito._
 import org.mockito.Matchers._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
 
 class TestGetOffset {
 
+  private val outOfRangeOffset : String = "0"
+
   /**
    * An empty message set is still a valid offset. It just means that the
    * offset was for the upcoming message, which hasn't yet been written. The
@@ -49,6 +53,19 @@ class TestGetOffset {
   }
 
   /**
+    * An empty message set is still a valid offset. It just means that the
+    * offset was for the upcoming message, which hasn't yet been written. The
+    * fetch request times out in such a case, and an empty message set is
+    * returned.
+    */
+  @Test
+  def testIsValidOffsetWorksWithOffsetOutOfRangeException {
+    val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
+    // Should not throw an exception.
+    assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, 
TopicAndPartition("foo", 1), outOfRangeOffset))
+  }
+
+  /**
    * Create a default fetch simple consumer that returns empty message sets.
    */
   def getMockDefaultFetchSimpleConsumer = {
@@ -75,7 +92,15 @@ class TestGetOffset {
         fetchResponse
       }
 
-      when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse)
+      doAnswer(new Answer[FetchResponse] {
+          override def answer(invocation: InvocationOnMock): FetchResponse = {
+            if (invocation.getArgumentAt(0, 
classOf[FetchRequest]).requestInfo.exists(
+              req => req._2.offset.toString.equals(outOfRangeOffset))) {
+              throw new OffsetOutOfRangeException("test exception")
+            }
+            fetchResponse
+          }
+        }).when(sc).fetch(any(classOf[FetchRequest]))
 
       override def fetch(request: FetchRequest): FetchResponse = {
         sc.fetch(request)

Reply via email to