1. onBatchError is not a bad idea.
2. It works for all the Kafka Direct API and files as well. They are have
batches. However you will not get the number of records for the file
stream.
3. Mind giving an example of the exception you would like to see caught?
TD
On Wed, Jul 1, 2015 at 10:35 AM,
Hi Cody and TD,
Just trying to understanding this under the hook, but cannot find any place
for this specific logic: once you reach max failures the whole stream
will stop.
If possible, could you point me to the right direction ?
For my understanding, the exception throw from the job would
Hi TD,
Why don’t we have OnBatchError or similar method in StreamingListener ?
Also, is StreamingListener only for receiver based approach or does it work for
Kafka Direct API / File Based Streaming as well ?
Regards,
Amit
From: Tathagata Das t...@databricks.commailto:t...@databricks.com
Thanks TD, this helps.
Looking forward to some fix where framework handles the batch failures by some
callback methods. This will help not having to write try/catch in every
transformation / action.
Regards,
Amit
From: Tathagata Das t...@databricks.commailto:t...@databricks.com
Date:
Also, how do you suggest catching exceptions while using with connector API
like, saveAsNewAPIHadoopFiles ?
From: amit assudani aassud...@impetus.commailto:aassud...@impetus.com
Date: Monday, June 29, 2015 at 9:55 AM
To: Tathagata Das t...@databricks.commailto:t...@databricks.com
Cc: Cody
I recommend writing using dstream.foreachRDD, and then
rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of
dstream.saveAsNewAPIHadoopFiles
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716
On
I looked at the code and found that batch exceptions are indeed ignored.
This is something that is worth fixing, that batch exceptions should not be
silently ignored.
Also, you can catch failed batch jobs (irrespective of the number of
retries) by catch the exception in foreachRDD. Here is an
Hmm, not sure why, but when I run this code, it always keeps on consuming from
Kafka and proceeds ignoring the previous failed batches,
Also, Now that I get the attempt number from TaskContext and I have information
of max retries, I am supposed to handle it in the try/catch block, but does it
No, if you have a bad message that you are continually throwing exceptions
on, your stream will not progress to future batches.
On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani aassud...@impetus.com
wrote:
Also, what I understand is, max failures doesn’t stop the entire stream,
it fails the
If you're consistently throwing exceptions and thus failing tasks, once you
reach max failures the whole stream will stop.
It's up to you to either catch those exceptions, or restart your stream
appropriately once it stops.
Keep in mind that if you're relying on checkpoints, and fixing the error
Problem: how do we recover from user errors (connectivity issues / storage
service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:
HashSetString topicsSet = new HashSetString(Arrays.asList(kafkaTopic1));
HashMapString, String kafkaParams = new HashMapString,
Thanks for quick response,
My question here is how do I know that the max retries are done ( because in my
code I never know whether it is failure of first try or the last try ) and I
need to handle this message, is there any callback ?
Also, I know the limitation of checkpoint in upgrading
TaskContext has an attemptNumber method on it.
If you want to know which messages failed, you have access to the offsets,
and can do whatever you need to with them.
On Fri, Jun 26, 2015 at 10:21 AM, Amit Assudani aassud...@impetus.com
wrote:
Thanks for quick response,
My question here is
Also, what I understand is, max failures doesn’t stop the entire stream, it
fails the job created for the specific batch, but the subsequent batches still
proceed, isn’t it right ? And question still remains, how to keep track of
those failed batches ?
From: amit assudani
Also, I get TaskContext.get() null when used in foreach function below ( I get
it when I use it in map, but the whole point here is to handle something that
is breaking in action ). Please help. :(
From: amit assudani aassud...@impetus.commailto:aassud...@impetus.com
Date: Friday, June 26, 2015
15 matches
Mail list logo