[ 
https://issues.apache.org/jira/browse/SAMZA-513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14285817#comment-14285817
 ] 

Chris Riccomini commented on SAMZA-513:
---------------------------------------

Wrote this test in TestBrokerProxy:

{code}
  @Test def testStop {
    val mockOffsetGetter = mock(classOf[GetOffset])
    when(mockOffsetGetter.isValidOffset(any(), any(), any())).thenReturn(true)
    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
    val answer = new Answer[FetchResponse]() {
      def answer(invocation: InvocationOnMock): FetchResponse = {
        try {
          System.err.println("Sleeping.")
          Thread.sleep(99999)
        } catch {
          case (e: Exception) =>
            // Pretend we caught a ClosedByInterruptException. Thread's 
            // interrupt flag would be set, and SimpleConsumer would throw a 
            // ClosedChannelException.
            Thread.currentThread().interrupt()
            throw new ClosedChannelException()
        }
        null
      }
    }
    when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
    val mockMessageSink = mock(classOf[MessageSink])
    when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
    val doNothingMetrics = new KafkaSystemConsumerMetrics()
    val brokerProxy = new BrokerProxy("", 0, "", "", doNothingMetrics, 
mockMessageSink, offsetGetter = mockOffsetGetter) {
      override def createSimpleConsumer() = {
        mockSimpleConsumer
      }
    }
    brokerProxy.addTopicPartition(new TopicAndPartition("", 0), Some("1"))
    brokerProxy.start
    Thread.sleep(1000)
    brokerProxy.stop
  }
{code}

Test terminates properly. Output is:

{noformat}
12 [main] INFO org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 - Starting 
BrokerProxy for :0
Sleeping.
1016 [main] INFO org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 - 
Shutting down BrokerProxy for :0
1019 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at :0 for client ] WARN 
org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 - Restarting consumer due 
to java.nio.channels.ClosedChannelException. Turn on debugging to get a full 
stack trace.
1021 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at :0 for client ] INFO 
org.apache.samza.system.kafka.TestBrokerProxy$$anon$4 - Shutting down due to 
interrupt.
{noformat}

> BrokerProxy doesn't shut down
> -----------------------------
>
>                 Key: SAMZA-513
>                 URL: https://issues.apache.org/jira/browse/SAMZA-513
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Zach Cox
>
> In BrokerProxy.stop, the call to thread.interrupt doesn't always lead to a 
> real thread interruption, which can cause shut down to not occur. One 
> workaround is to repeatedly call thread.interrupt in a tight while loop until 
> thread.isAlive returns false.
> Related mailing list thread: 
> http://www.mail-archive.com/[email protected]/msg02255.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to