Repository: samza Updated Branches: refs/heads/master 2a71baf7c -> 46685406f
SAMZA-1822: Samza 0.14.1 not correctly handling OffsetOutOfRangeException exception Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/46685406 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/46685406 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/46685406 Branch: refs/heads/master Commit: 46685406fb55d5488802e43c435005ffe6511194 Parents: 2a71baf Author: Gaurav Agarwal <gauravagarw...@gmail.com> Authored: Mon Aug 27 14:45:05 2018 -0700 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Mon Aug 27 14:49:13 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/system/kafka/GetOffset.scala | 3 +- .../samza/system/kafka/TestGetOffset.scala | 31 ++++++++++++++++++-- 2 files changed, 29 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/46685406/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala index 040e246..55b4611 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala @@ -21,12 +21,11 @@ package org.apache.samza.system.kafka -import kafka.common.OffsetOutOfRangeException +import org.apache.kafka.common.errors.OffsetOutOfRangeException import kafka.api._ import kafka.common.TopicAndPartition import kafka.api.PartitionOffsetRequestInfo import org.apache.samza.util.Logging -import kafka.message.MessageAndOffset import org.apache.samza.util.KafkaUtil /** http://git-wip-us.apache.org/repos/asf/samza/blob/46685406/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala index b959348..ab82609 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala @@ -26,15 +26,19 @@ import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.message.ByteBufferMessageSet - +import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.junit._ import org.junit.Assert._ -import org.mockito.{ Matchers, Mockito } +import org.mockito.Mockito import org.mockito.Mockito._ import org.mockito.Matchers._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer class TestGetOffset { + private val outOfRangeOffset : String = "0" + /** * An empty message set is still a valid offset. It just means that the * offset was for the upcoming message, which hasn't yet been written. The @@ -49,6 +53,19 @@ class TestGetOffset { } /** + * An empty message set is still a valid offset. It just means that the + * offset was for the upcoming message, which hasn't yet been written. The + * fetch request times out in such a case, and an empty message set is + * returned. + */ + @Test + def testIsValidOffsetWorksWithOffsetOutOfRangeException { + val getOffset = new GetOffset(OffsetRequest.LargestTimeString) + // Should not throw an exception. + assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), outOfRangeOffset)) + } + + /** * Create a default fetch simple consumer that returns empty message sets. */ def getMockDefaultFetchSimpleConsumer = { @@ -75,7 +92,15 @@ class TestGetOffset { fetchResponse } - when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse) + doAnswer(new Answer[FetchResponse] { + override def answer(invocation: InvocationOnMock): FetchResponse = { + if (invocation.getArgumentAt(0, classOf[FetchRequest]).requestInfo.exists( + req => req._2.offset.toString.equals(outOfRangeOffset))) { + throw new OffsetOutOfRangeException("test exception") + } + fetchResponse + } + }).when(sc).fetch(any(classOf[FetchRequest])) override def fetch(request: FetchRequest): FetchResponse = { sc.fetch(request)