Repository: samza Updated Branches: refs/heads/master bc7a07a1a -> 74aa516fd
SAMZA-753 - BrokerProxy stop should stop the Kafka consumer first Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74aa516f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74aa516f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74aa516f Branch: refs/heads/master Commit: 74aa516fd84f510cd9e92e9bff90e480845763d2 Parents: bc7a07a Author: Yan Fang <yanfang...@gmail.com> Authored: Tue Nov 17 16:58:07 2015 -0800 Committer: Navina <navi.trin...@gmail.com> Committed: Tue Nov 17 17:06:01 2015 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/BrokerProxy.scala | 5 +++++ .../apache/samza/system/kafka/TestBrokerProxy.scala | 13 +++++++++++++ 2 files changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/74aa516f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index c8cbc38..9aa9818 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -303,6 +303,11 @@ class BrokerProxy( def stop { info("Shutting down " + toString) + if (simpleConsumer != null) { + info("closing simple consumer...") + simpleConsumer.close + } + thread.interrupt thread.join } http://git-wip-us.apache.org/repos/asf/samza/blob/74aa516f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 170318e..cc7077c 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -417,4 +417,17 @@ class TestBrokerProxy extends Logging { } assertEquals(true, caughtError) } + + @Test + def brokerProxyStopCloseConsumer: Unit = { + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){ + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + bp.start + bp.stop + verify(mockSimpleConsumer).close + } }