>> checkpoints can't be used between controlled restarts Is that true? If so, why? From my testing, checkpoints appear to be working fine, we get the data we've missed between the time the consumer went down and the time we brought it back up.
>> If I cannot make checkpoints between code upgrades, does it mean that Spark does not help me at all with keeping my Kafka offsets? Does it mean, that I have to implement my own storing to/initalization of offsets from Zookeeper? By code upgrades, are code changes to the consumer program meant? If that is the case, one idea we've been entertaining is that, if the consumer changes, especially if its configuration parameters change, it means that some older configuration may still be stuck in the checkpointing. What we'd do in this case is, prior to starting the consumer, blow away the checkpointing directory and re-consume from Kafka from the smallest offsets. In our case, it's OK to re-process; I realize that in many cases that may not be an option. If that's the case then it would seem to follow that you have to manage offsets in Zk... Another thing to consider would be to treat upgrades operationally. In that, if an upgrade is to happen, consume the data up to a certain point then bring the system down for an upgrade. Remove checkpointing. Restart everything; the system would now be rebuilding the checkpointing and using your upgraded consumers. (Again, this may not be possible in some systems where the data influx is constant and/or the data is mission critical)... Perhaps this discussion implies that there may be a new feature in Spark where it intelligently drops the checkpointing or allows you to selectively pluck out and drop some items prior to restarting... On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > This consumer pretty much covers all those scenarios you listed > github.com/dibbhatt/kafka-spark-consumer Give it a try. > > Thanks > Best Regards > > On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki <k.zarzy...@gmail.com> > wrote: > >> Hi there, >> I have a problem with fulfilling all my needs when using Spark Streaming >> on Kafka. Let me enumerate my requirements: >> 1. I want to have at-least-once/exactly-once processing. >> 2. I want to have my application fault & simple stop tolerant. The Kafka >> offsets need to be tracked between restarts. >> 3. I want to be able to upgrade code of my application without losing >> Kafka offsets. >> >> Now what my requirements imply according to my knowledge: >> 1. implies using new Kafka DirectStream. >> 2. implies using checkpointing. kafka DirectStream will write offsets to >> the checkpoint as well. >> 3. implies that checkpoints can't be used between controlled restarts. So >> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is >> a description how: >> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully >> ) >> >> Now my problems are: >> 1. If I cannot make checkpoints between code upgrades, does it mean that >> Spark does not help me at all with keeping my Kafka offsets? Does it mean, >> that I have to implement my own storing to/initalization of offsets from >> Zookeeper? >> 2. When I set up shutdownHook and my any executor throws an exception, it >> seems that application does not fail, but stuck in running state. Is that >> because stopGracefully deadlocks on exceptions? How to overcome this >> problem? Maybe I can avoid setting shutdownHook and there is other way to >> stop gracefully your app? >> >> 3. If I somehow overcome 2., is it enough to just stop gracefully my app >> to be able to upgrade code & not lose Kafka offsets? >> >> >> Thank you a lot for your answers, >> Krzysztof Zarzycki >> >> >> >> >