Hey Zach,

I'm beginning to wonder if this line is the problem:

  
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consum
er/SimpleConsumer.scala#L75

If the sendRequest in SimpleConsumer catches a Throwable, it will catch
ClosedByInterruptException. This will cause the interrupt flag to get
reset, I think. If the consumer the retries the send, and it succeeds,
then I think it will continue on and ignore the interrupt. Going to poke a
Kafka dev to see if I am misunderstanding something.

Cheers,
Chris

On 1/15/15 10:52 AM, "Chris Riccomini" <[email protected]> wrote:

>Hey Zach,
>
>Hmm. It sounds like something is catching the InterruptedException, and
>letting the interrupt flag get reset. If that happens, the
>ExponentialSleepStrategy [1] will continue running.
>
>I walked through the code, but couldn't find where it might be doing so,
>but it's hard to catch these without a test.
>
>> I have never seen that "Restarting consumer due to ..." warn line in the
>>logs.
>
>If that's the case, then I don't think the BrokerProxy can be reconnecting
>since reconnect must be set to true, which would require seeing that log
>line [2].
>
>Out of curiosity, what version of the JVM are you using? Trying to
>replicate this is proving tricky.
>
>I'm beginning to wonder if this has to do with InterruptedException vs.
>ClosedByInterruptException. In TestStatefulTask, I only see this the first
>log line:
>
>  case e: InterruptedException => info("Got interrupt exception in broker
>proxy thread.")
>  case e: ClosedByInterruptException => info("Got closed by interrupt
>exception in broker proxy thread.")
>
>This is probably because the test connects to a local Kafka broker, so nio
>socket operations are extremely fast, and the test almost always gets
>interrupted on a Thread.sleep, not a blocking socket operation. I'm
>wondering if a ClosedByInterruptException exception would cause the
>hanging that you're seeing.
>
>
>Out of curiosity, is your container on the same machine as your broker, or
>a different machine?
>
>Cheers,
>Chris
>
>[1] 
>https://github.com/apache/incubator-samza/blob/0.8.0/samza-core/src/main/s
>c
>ala/org/apache/samza/util/ExponentialSleepStrategy.scala#L80
>[2] 
>https://github.com/apache/incubator-samza/blob/0.8.0/samza-kafka/src/main/
>s
>cala/org/apache/samza/system/kafka/BrokerProxy.scala#L133
>
>On 1/15/15 9:38 AM, "Zach Cox" <[email protected]> wrote:
>
>>Hi Chris,
>>
>>
>>> Looking at your logs (from previous email, granted, different
>>>execution),
>>> I see that we get as far as SimpleConsumer.disconnect():
>>>
>>>   [DEBUG] [2015-01-14 17:19:12,596] [SAMZA-BROKER-PROXY-BrokerProxy
>>>...]
>>> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from localhost:9192
>>>
>>> This is invoked by BrokerProxy.scala:135. The only way this should get
>>> invoked is if an exception is caught. Do you see this log line
>>>anywhere?
>>>
>>>   warn("Restarting consumer due to %s. Turn on debugging to get a full
>>> stack trace." format exception)
>>>
>>
>>DefaultFetchConsumer.close() calls SimpleConsumer.close(), which calls
>>disconnect(), which also logs that "Disconnecting from localhost:9192"
>>line:
>>
>>https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/mai
>>n
>>/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala#L44
>>
>>https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/cons
>>u
>>mer/SimpleConsumer.scala#L50
>>
>>I have never seen that "Restarting consumer due to ..." warn line in the
>>logs.
>>
>>Also note that I'm using Samza 0.8.0, not master.
>>
>>
>>> I ran our TestStatefulTask test a few times, and verified that the
>>> consumer CAN be shutdown:
>>>
>>>   [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Shutting
>>> down consumer multiplexer.
>>>   [ThreadJob] INFO org.apache.samza.system.kafka.BrokerProxy - Shutting
>>> down BrokerProxy for localhost:59075
>>>   2015-01-15 08:59:26 DefaultFetchSimpleConsumer [WARN] Reconnect due
>>>to
>>> socket error: null
>>>   [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO
>>> org.apache.samza.system.kafka.BrokerProxy - Got closed by interrupt
>>> exception in broker proxy thread.
>>>   [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at ...] INFO
>>> org.apache.samza.system.kafka.BrokerProxy - Shutting down due to
>>>interrupt.
>>>   // consumer shutdown is complete here.. The container has moved on to
>>> shutting down the producer
>>>   [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Shutting
>>> down producer multiplexer.
>>>
>>
>>Yesterday afternoon I was showing this to a colleague, and we did observe
>>a
>>proper clean shutdown once. Then the next 2 times I showed him we saw the
>>BrokerProxy blocking issue. So I also observe proper shutdown sometimes
>>too, just seems a lot more often that BrokerProxy blocks and it doesn't
>>shut down.
>>
>>Thanks,
>>Zach
>>
>>
>>
>>
>>>
>>>
>>> Cheers,
>>> Chris
>>>
>>> On 1/14/15 7:24 PM, "Zach Cox" <[email protected]> wrote:
>>>
>>> >Hi Chris,
>>> >
>>> >I did a thread dump during the shutdown process, when I could tell it
>>> >wasn't shutting down properly [1]. You can see this thread dump in the
>>> >context of the other logging at shutdown.
>>> >
>>> >The main thread is indeed blocked on the Thread.join call in
>>> >BrokerProxy.stop. The BrokerProxy thread looks like it's still
>>>consuming
>>> >from Kafka?
>>> >
>>> >The thread dump was a bit tricky; this Samza container is running in a
>>> >Docker container on boot2docker, which is Tiny Core Linux, i.e. no
>>>jstack.
>>> >I never new this, but `sudo kill -3 [pid]` tells the jvm to do a
>>>thread
>>> >dump to stdout [2] :)
>>> >
>>> >Thanks,
>>> >Zach
>>> >
>>> >[1]
>>> >
>>> 
>>>https://gist.githubusercontent.com/zcox/bb47a61d4ae1acd54056/raw/83224d2
>>>0
>>>a
>>> >61de20bb47bcf916d92930a71cd97ad/gistfile1.txt
>>> >
>>> >[2]
>>> >
>>> 
>>>http://serverfault.com/questions/574745/jstack-alternative-for-linux-red
>>>h
>>>a
>>> >t-6
>>> >
>>> >
>>> >On Wed, Jan 14, 2015 at 7:30 PM, Chris Riccomini <
>>> >[email protected]> wrote:
>>> >
>>> >> Hey Zach,
>>> >>
>>> >> It sounds likely that the BrokerProxy thread is blocking or
>>>improperly
>>> >> catching an interrupt exception.
>>> >>
>>> >> Can you take a thread dump? My guess is that you'll see that the
>>>main
>>> >> thread is blocked on BrokerProxy.stop:
>>> >>
>>> >>   thread.interrupt
>>> >>   thread.join
>>> >>
>>> >>
>>> >> It'll likely be blocked on thread.join. If that's the case, I'd like
>>>to
>>> >> see what the BrokerProxy thread is doing. This line indicates that
>>>the
>>> >> thread seems to be shutting down:
>>> >>
>>> >> [DEBUG] [2015-01-14 17:19:12,596] [SAMZA-BROKER-PROXY-BrokerProxy
>>>thread
>>> >> pointed at localhost:9192 for client
>>> >> samza_consumer-twitter_message_separation-1-1421255914862-1]
>>> >> o.a.s.s.k.DefaultFetchSimpleConsumer: Disconnecting from
>>>localhost:9192
>>> >>
>>> >> But if thread.join is blocking, then the shutdown never seems to
>>> >>complete.
>>> >> It'd be good to see where these threads are at.
>>> >>
>>> >>
>>> >> Cheers,
>>> >> Chris
>>> >>
>>> >> On 1/14/15 10:49 AM, "Zach Cox" <[email protected]> wrote:
>>> >>
>>> >> >Hi - related to the discussion yesterday about graceful shutdown
>>>[1],
>>> >> >today
>>> >> >I can't seem to get the SamzaContainer to actually shut down.
>>> >>Yesterday I
>>> >> >was seeing nice, fast, complete shutdown logs [2]. However, today
>>>the
>>> >>last
>>> >> >log line I see is related to shutting down BrokerProxy, then
>>>nothing
>>> >>else
>>> >> >happens until the container is finally just SIGKILLed by Docker
>>>[3].
>>> >> >
>>> >> >The consistency with which clean, fast shutdowns were happening
>>> >>yesterday
>>> >> >and now they are never happening today leads me to believe I've
>>>changed
>>> >> >something, but I really can't find anything that would lead to
>>>this.
>>> >>It's
>>> >> >almost like there's some deadlock in BrokerProxy, but I would think
>>> >>that
>>> >> >would show up randomly - I've tried this about 100 times today, and
>>> >>every
>>> >> >time the logs are as shown in [3]. Today I also see 2 exceptions in
>>>the
>>> >> >Kafka broker logs [4] but I don't know if those were occurring
>>> >>yesterday
>>> >> >when Samza was cleanly shutting down, or not.
>>> >> >
>>> >> >One thing that doesn't seem to be happening today is this log line
>>>from
>>> >> >BrokerProxy: "Got interrupt exception in broker proxy thread."
>>> >> >
>>> >> >Has anyone seen this, or have any advice on what to try next?
>>> >> >
>>> >> >Thanks,
>>> >> >Zach
>>> >> >
>>> >> >
>>> >> >[1]
>>> >>
>>> >>>
>>> 
>>>http://www.mail-archive.com/[email protected]/msg02246.html
>>> >> >
>>> >> >[2]
>>> >> >
>>> >>
>>> >>
>>> 
>>>https://gist.githubusercontent.com/zcox/6ec8910bd3f18e36c1a2/raw/666eae2
>>>4
>>> >>5
>>> >> >11490bf51a66e56fd90c794ea6b6282/stdout
>>> >> >
>>> >> >[3]
>>> >> >
>>> >>
>>> >>
>>> 
>>>https://gist.githubusercontent.com/zcox/673a2ba607c566de7650/raw/d29ea8b
>>>9
>>> >>9
>>> >> >868da9cc4dc1db198315ee1d03bc694/gistfile1.txt
>>> >> >
>>> >> >[4]
>>> >> >
>>> >>
>>> >>
>>> 
>>>https://gist.githubusercontent.com/zcox/f08de55c5d5fe2d70cde/raw/c4be453
>>>b
>>> >>5
>>> >> >f6a57cfb5acb08508667f4b8ce8c2bd/gistfile1.txt
>>> >>
>>> >>
>>>
>>>
>

Reply via email to