AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault tolerance, which means it can handle the receiver/driver failures. You can also look at the lowlevel kafka consumer <https://github.com/dibbhatt/kafka-spark-consumer> which has a better fault tolerance mechanism for receiver failures. This low level consumer will push the offset of the message being read into zookeeper for fault tolerance. In your case i think mostly the "inflight data" would be lost if you arent using any of the fault tolerance mechanism.
Thanks Best Regards On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha <me.mukesh....@gmail.com> wrote: > Hello Sprakans, > > I'm running a spark streaming app which reads data from kafka topic does > some processing and then persists the results in HBase. > > I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8 > cores each). I've enable checkpointing & I am also rate limiting my > kafkaReceivers so that the number of items read is not more than 10 records > per sec. > The kafkaReceiver I'm using is *not* ReliableKafkaReceiver. > > This app was running fine for ~3 days then there was an increased load on > the HBase server because of some other process querying HBase tables. > This led to increase in the batch processing time of the spark batches > (processed 1 min batch in 10 min) which previously was finishing in 20 sec > which in turn led to the shutdown of the spark application, PFA the > executor logs. > > From the logs I'm getting below exceptions *[1]* & *[2]* looks like there > was some outstanding Jobs that didn't get processed or the Job couldn't > find the input data. From the logs it looks seems that the shutdown hook > gets invoked but it cannot process the in-flight block. > > I have a couple of queries on this > 1) Does this mean that these jobs failed and the *in-flight data *is > lost? > 2) Does the Spark job *buffers kafka* input data while the Job is under > processing state for 10 mins and on shutdown is that too lost? (I do not > see any OOM error in the logs). > 3) Can we have *explicit commits* enabled in the kafkaReceiver so that > the offsets gets committed only when the RDD(s) get successfully processed? > > Also I'd like to know if there is a *graceful way to shutdown a spark app > running on yarn*. Currently I'm killing the yarn app to stop it which > leads to loss of that job's history wheras in this case the application > stops and succeeds and thus preserves the logs & history. > > *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still have > 1 requests outstanding when connection from > hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed > *[2]* java.lang.Exception: Could not compute split, block > input-2-1422901498800 not found > *[3]* > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode > 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63 > does not have any open files. > > -- > Thanks & Regards, > > *Mukesh Jha <me.mukesh....@gmail.com>* > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >