Re: How to recover in case user errors in streaming

2015-07-06 Thread Tathagata Das
1. onBatchError is not a bad idea. 2. It works for all the Kafka Direct API and files as well. They are have batches. However you will not get the number of records for the file stream. 3. Mind giving an example of the exception you would like to see caught? TD On Wed, Jul 1, 2015 at 10:35 AM,

Re: How to recover in case user errors in streaming

2015-07-06 Thread Li,ZhiChao
Hi Cody and TD, Just trying to understanding this under the hook, but cannot find any place for this specific logic: once you reach max failures the whole stream will stop. If possible, could you point me to the right direction ? For my understanding, the exception throw from the job would

Re: How to recover in case user errors in streaming

2015-07-01 Thread Amit Assudani
Hi TD, Why don’t we have OnBatchError or similar method in StreamingListener ? Also, is StreamingListener only for receiver based approach or does it work for Kafka Direct API / File Based Streaming as well ? Regards, Amit From: Tathagata Das t...@databricks.commailto:t...@databricks.com

Re: How to recover in case user errors in streaming

2015-06-29 Thread Amit Assudani
Thanks TD, this helps. Looking forward to some fix where framework handles the batch failures by some callback methods. This will help not having to write try/catch in every transformation / action. Regards, Amit From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date:

Re: How to recover in case user errors in streaming

2015-06-29 Thread Amit Assudani
Also, how do you suggest catching exceptions while using with connector API like, saveAsNewAPIHadoopFiles ? From: amit assudani aassud...@impetus.commailto:aassud...@impetus.com Date: Monday, June 29, 2015 at 9:55 AM To: Tathagata Das t...@databricks.commailto:t...@databricks.com Cc: Cody

Re: How to recover in case user errors in streaming

2015-06-29 Thread Tathagata Das
I recommend writing using dstream.foreachRDD, and then rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of dstream.saveAsNewAPIHadoopFiles https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716 On

Re: How to recover in case user errors in streaming

2015-06-27 Thread Tathagata Das
I looked at the code and found that batch exceptions are indeed ignored. This is something that is worth fixing, that batch exceptions should not be silently ignored. Also, you can catch failed batch jobs (irrespective of the number of retries) by catch the exception in foreachRDD. Here is an

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Hmm, not sure why, but when I run this code, it always keeps on consuming from Kafka and proceeds ignoring the previous failed batches, Also, Now that I get the attempt number from TaskContext and I have information of max retries, I am supposed to handle it in the try/catch block, but does it

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
No, if you have a bad message that you are continually throwing exceptions on, your stream will not progress to future batches. On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani aassud...@impetus.com wrote: Also, what I understand is, max failures doesn’t stop the entire stream, it fails the

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
If you're consistently throwing exceptions and thus failing tasks, once you reach max failures the whole stream will stop. It's up to you to either catch those exceptions, or restart your stream appropriately once it stops. Keep in mind that if you're relying on checkpoints, and fixing the error

How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Problem: how do we recover from user errors (connectivity issues / storage service down / etc.)? Environment: Spark streaming using Kafka Direct Streams Code Snippet: HashSetString topicsSet = new HashSetString(Arrays.asList(kafkaTopic1)); HashMapString, String kafkaParams = new HashMapString,

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Thanks for quick response, My question here is how do I know that the max retries are done ( because in my code I never know whether it is failure of first try or the last try ) and I need to handle this message, is there any callback ? Also, I know the limitation of checkpoint in upgrading

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
TaskContext has an attemptNumber method on it. If you want to know which messages failed, you have access to the offsets, and can do whatever you need to with them. On Fri, Jun 26, 2015 at 10:21 AM, Amit Assudani aassud...@impetus.com wrote: Thanks for quick response, My question here is

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Also, what I understand is, max failures doesn’t stop the entire stream, it fails the job created for the specific batch, but the subsequent batches still proceed, isn’t it right ? And question still remains, how to keep track of those failed batches ? From: amit assudani

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Also, I get TaskContext.get() null when used in foreach function below ( I get it when I use it in map, but the whole point here is to handle something that is breaking in action ). Please help. :( From: amit assudani aassud...@impetus.commailto:aassud...@impetus.com Date: Friday, June 26, 2015