[ 
https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887092#comment-15887092
 ] 

Shixiong Zhu commented on SPARK-19738:
--------------------------------------

[~gaaldornick] could you check if SPARK-18699 is enough? It will put all bad 
records to a column specified by the user.

> Consider adding error handler to DataStreamWriter
> -------------------------------------------------
>
>                 Key: SPARK-19738
>                 URL: https://issues.apache.org/jira/browse/SPARK-19738
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Jayesh lalwani
>
> For Structured streaming implementations, it is important that the 
> applications stay always On. However, right now, errors stop the driver. In 
> some cases, this is not desirable behavior. For example, I have the following 
> application
> {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").start()
> {code}
> I send the following input to it 
> {quote}
> 1,Iron man
> 2,SUperman
> {quote}
> Obviously, the data is bad. This causes the executor to throw an exception 
> that propogates to the driver, which promptly shuts down. The driver is 
> running in supervised mode, and it gets restarted. The application reads the 
> same bad input and shuts down again. This goes ad-infinitum. This behavior is 
> desirable, in cases, the error is recoverable. For example, if the executor 
> cannot talk to the database, we want the application to keep trying the same 
> input again and again till the database recovers. However, for some cases, 
> this behavior is undesirable. We do not want this to happen when the input is 
> bad. We want to put the bad record in some sort of dead letter queue. Or 
> maybe we want to kill the driver only when the number of errors have crossed 
> a certain threshold. Or maybe we want to email someone.
> Proposal:
> Add a error handler to the data stream. When the executor fails, it should 
> call the error handler and pass the Exception to the error handler. The error 
> handler could eat the exception, or transform it, or update counts in an 
> accumulator, etc
>  {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to