SAMZA-590; kafka broker proxy should relinquish ownership for all partitions when its consumer fails
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c725b3ce Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c725b3ce Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c725b3ce Branch: refs/heads/samza-sql Commit: c725b3ceb6d2e116f5da257ba45fe70fa877747a Parents: b82d458 Author: Chris Riccomini <[email protected]> Authored: Wed Mar 11 09:28:58 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Mar 11 09:28:58 2015 -0700 ---------------------------------------------------------------------- .../apache/samza/system/kafka/BrokerProxy.scala | 28 +++++++--- .../samza/system/kafka/TestBrokerProxy.scala | 59 ++++++++++++++++++++ 2 files changed, 80 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c725b3ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index f768263..c6e231a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -153,8 +153,9 @@ class BrokerProxy( }, (exception, loop) => { - warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception) + warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception) debug("Exception detail:", exception) + abdicateAll reconnect = true }) } catch { @@ -182,7 +183,6 @@ class BrokerProxy( nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) } } else { - refreshLatencyMetrics debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions)) @@ -193,13 +193,27 @@ class BrokerProxy( } } - def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = { + /** + * Releases ownership for a single TopicAndPartition. The + * KafkaSystemConsumer will try and find a new broker for the + * TopicAndPartition. + */ + def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match { // Need to be mindful of a tp that was removed by another thread - def abdicate(tp:TopicAndPartition) = removeTopicPartition(tp) match { - case Some(offset) => messageSink.abdicate(tp, offset) - case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?") - } + case Some(offset) => messageSink.abdicate(tp, offset) + case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?") + } + /** + * Releases all TopicAndPartition ownership for this BrokerProxy thread. The + * KafkaSystemConsumer will try and find a new broker for the + * TopicAndPartition. + */ + def abdicateAll { + nextOffsets.keySet.foreach(abdicate(_)) + } + + def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = { // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves case class Error(tp: TopicAndPartition, code: Short, exception: Throwable) http://git-wip-us.apache.org/repos/asf/samza/blob/c725b3ce/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index d559d8b..e285dec 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -21,6 +21,7 @@ package org.apache.samza.system.kafka import java.nio.ByteBuffer +import java.nio.channels.ClosedChannelException import java.util.concurrent.CountDownLatch import kafka.api._ @@ -299,4 +300,62 @@ class TestBrokerProxy extends Logging { fail(failString) } } + + /** + * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions + * that it owns when a consumer failure occurs. + */ + @Test def brokerProxyAbdicatesOnConnectionFailure():Unit = { + val countdownLatch = new CountDownLatch(1) + var abdicated: Option[TopicAndPartition] = None + @volatile var refreshDroppedCount = 0 + val mockMessageSink = new MessageSink { + override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { + } + override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { + } + override def abdicate(tp: TopicAndPartition, nextOffset: Long) { + abdicated = Some(tp) + countdownLatch.countDown + } + override def refreshDropped() { + refreshDroppedCount += 1 + } + override def needsMoreMessages(tp: TopicAndPartition): Boolean = { + true + } + } + + val doNothingMetrics = new KafkaSystemConsumerMetrics() + val tp = new TopicAndPartition("topic", 42) + val mockOffsetGetter = mock(classOf[GetOffset]) + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + + when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true) + when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l) + when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that.")) + + val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + + val waitForRefresh = () => { + val currentRefreshDroppedCount = refreshDroppedCount + while (refreshDroppedCount == currentRefreshDroppedCount) { + Thread.sleep(100) + } + } + + bp.addTopicPartition(tp, Option("0")) + bp.start + // BP should refresh on startup. + waitForRefresh() + countdownLatch.await() + // BP should continue refreshing after it's abdicated all TopicAndPartitions. + waitForRefresh() + bp.stop + assertEquals(tp, abdicated.getOrElse(null)) + } }
