Re: How to shut down Spark Streaming with Kafka properly?

2014-06-06 Thread Sean Owen
I closed that PR for other reasons. This change is still proposed by itself:

https://issues.apache.org/jira/browse/SPARK-2034
https://github.com/apache/spark/pull/980

On Fri, Jun 6, 2014 at 1:35 AM, Tobias Pfeiffer  wrote:
> Sean,
>
> your patch fixes the issue, thank you so much! (This is the second
> time within one week I run into network libraries not shutting down
> threads properly, I'm really glad your code fixes the issue.)
>
> I saw your pull request is closed, but not merged yet. Can I do
> anything to get your fix into Spark? Open an issue, send a pull
> request myself etc.?
>
> Thanks
> Tobias


Re: How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Tobias Pfeiffer
Sean,

your patch fixes the issue, thank you so much! (This is the second
time within one week I run into network libraries not shutting down
threads properly, I'm really glad your code fixes the issue.)

I saw your pull request is closed, but not merged yet. Can I do
anything to get your fix into Spark? Open an issue, send a pull
request myself etc.?

Thanks
Tobias


Re: How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Tobias Pfeiffer
Sean,

thanks for your link! I will try this ASAP!

On Thu, Jun 5, 2014 at 6:49 PM, Sean Owen  wrote:
> However I do seem to be able to shut down everything cleanly and
> terminate my (Java-based) program. I just call
> StreamingContext.stop(true, true). I don't know why it's different.

I think that's a peculiarity of sbt, cf.
http://www.scala-sbt.org/0.12.2/docs/Detailed-Topics/Running-Project-Code.html
If I call System.exit() that will surely terminate my program (and
sbt) as well, but in general I think it's better not to have these
threads lurking around ;-)

Thanks
Tobias


Re: How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Sean Owen
Yes I noted the same two issues -- there is a Executor that is never
closed down, and the ConsumerConnector is never shut down.

I foolishly tacked on a change to this effect on a different PR
(https://github.com/apache/spark/pull/926/files#diff-bf41e92a42a1bdb3bc1662fd9fc50fe2L38)
Maybe I can just propose this as a stand-alone change (and/or you can
try it separately).

However I do seem to be able to shut down everything cleanly and
terminate my (Java-based) program. I just call
StreamingContext.stop(true, true). I don't know why it's different.

But I think cleaning up the non-daemon threads from that pool is a
good change that should fix your issue. One sec...

On Thu, Jun 5, 2014 at 10:40 AM, Tobias Pfeiffer  wrote:
> Hi,
>
> I am trying to use Spark Streaming with Kafka, which works like a
> charm -- except for shutdown. When I run my program with "sbt
> run-main", sbt will never exit, because there are two non-daemon
> threads left that don't die.
>
> I created a minimal example at
> .
> It starts a StreamingContext and does nothing more than connecting to
> a Kafka server and printing what it receives. Using the `future { ...
> }` construct, I shut down the StreamingContext after some seconds and
> then print the difference between the threads at start time and at end
> time. The output can be found at
> .
> There are a number of threads remaining that will prevent sbt from
> exiting.
>
> When I replace `KafkaUtils.createStream(...)` with a call that does
> exactly the same, except that it calls `consumerConnector.shutdown()`
> in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
> shown at 
> .
>
> Does anyone have *any* idea what is going on here and why the program
> doesn't shut down properly? The behavior is the same with both kafka
> 0.8.0 and 0.8.1.1, by the way.
>
> Thanks
> Tobias


How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Tobias Pfeiffer
Hi,

I am trying to use Spark Streaming with Kafka, which works like a
charm -- except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.

I created a minimal example at
.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future { ...
}` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
.
There are a number of threads remaining that will prevent sbt from
exiting.

When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at .

Does anyone have *any* idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

Thanks
Tobias