[
https://issues.apache.org/jira/browse/SAMZA-513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14285817#comment-14285817
]
Chris Riccomini commented on SAMZA-513:
---------------------------------------
Wrote this test in TestBrokerProxy:
{code}
@Test def testStop {
val mockOffsetGetter = mock(classOf[GetOffset])
when(mockOffsetGetter.isValidOffset(any(), any(), any())).thenReturn(true)
val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
val answer = new Answer[FetchResponse]() {
def answer(invocation: InvocationOnMock): FetchResponse = {
try {
System.err.println("Sleeping.")
Thread.sleep(99999)
} catch {
case (e: Exception) =>
// Pretend we caught a ClosedByInterruptException. Thread's
// interrupt flag would be set, and SimpleConsumer would throw a
// ClosedChannelException.
Thread.currentThread().interrupt()
throw new ClosedChannelException()
}
null
}
}
when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
val mockMessageSink = mock(classOf[MessageSink])
when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
val doNothingMetrics = new KafkaSystemConsumerMetrics()
val brokerProxy = new BrokerProxy("", 0, "", "", doNothingMetrics,
mockMessageSink, offsetGetter = mockOffsetGetter) {
override def createSimpleConsumer() = {
mockSimpleConsumer
}
}
brokerProxy.addTopicPartition(new TopicAndPartition("", 0), Some("1"))
brokerProxy.start
Thread.sleep(1000)
brokerProxy.stop
}
{code}
Test terminates properly. Output is:
{noformat}
12 [main] INFO org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 - Starting
BrokerProxy for :0
Sleeping.
1016 [main] INFO org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 -
Shutting down BrokerProxy for :0
1019 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at :0 for client ] WARN
org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 - Restarting consumer due
to java.nio.channels.ClosedChannelException. Turn on debugging to get a full
stack trace.
1021 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at :0 for client ] INFO
org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 - Shutting down due to
interrupt.
{noformat}
> BrokerProxy doesn't shut down
> -----------------------------
>
> Key: SAMZA-513
> URL: https://issues.apache.org/jira/browse/SAMZA-513
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.8.0
> Reporter: Zach Cox
>
> In BrokerProxy.stop, the call to thread.interrupt doesn't always lead to a
> real thread interruption, which can cause shut down to not occur. One
> workaround is to repeatedly call thread.interrupt in a tight while loop until
> thread.isAlive returns false.
> Related mailing list thread:
> http://www.mail-archive.com/[email protected]/msg02255.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)