[ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887092#comment-15887092 ]
Shixiong Zhu commented on SPARK-19738: -------------------------------------- [~gaaldornick] could you check if SPARK-18699 is enough? It will put all bad records to a column specified by the user. > Consider adding error handler to DataStreamWriter > ------------------------------------------------- > > Key: SPARK-19738 > URL: https://issues.apache.org/jira/browse/SPARK-19738 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming > Affects Versions: 2.1.0 > Reporter: Jayesh lalwani > > For Structured streaming implementations, it is important that the > applications stay always On. However, right now, errors stop the driver. In > some cases, this is not desirable behavior. For example, I have the following > application > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").start() > {code} > I send the following input to it > {quote} > 1,Iron man > 2,SUperman > {quote} > Obviously, the data is bad. This causes the executor to throw an exception > that propogates to the driver, which promptly shuts down. The driver is > running in supervised mode, and it gets restarted. The application reads the > same bad input and shuts down again. This goes ad-infinitum. This behavior is > desirable, in cases, the error is recoverable. For example, if the executor > cannot talk to the database, we want the application to keep trying the same > input again and again till the database recovers. However, for some cases, > this behavior is undesirable. We do not want this to happen when the input is > bad. We want to put the bad record in some sort of dead letter queue. Or > maybe we want to kill the driver only when the number of errors have crossed > a certain threshold. Or maybe we want to email someone. > Proposal: > Add a error handler to the data stream. When the executor fails, it should > call the error handler and pass the Exception to the error handler. The error > handler could eat the exception, or transform it, or update counts in an > accumulator, etc > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org