Hi Jayesh So you have 2 problems here
1) Data was loaded in the wrong format 2) Once you handled the wrong data the spark job will continually retry the failed batch For 2 its very easy to go into the checkpoint directory and delete that offset manually and make it seem like it never happened. However for point 1 the issue is a little bit more trickier, if you receive bad data then perhaps your first point of call should be a cleaning process to ensure your data is atleast parsable, then move it to another directory which spark streaming is looking at It is unreasonable to have spark both do the streaming and handle bad data for you yet remain extremely simple and easy to use That said I personally would have a conversation with the provider of the data In this scenario I just ensure that these providers ensure the format of the data is correct, whether its CSV JSON AVRO PARQUET or whatever, I should hope whatever service/company is providing this data is providing it "correctly" to a set definition, otherwise you will have to do a pre cleaning step Perhaps someone else can suggest a better/cleaner approach Regards Sam On Thu, Feb 23, 2017 at 2:09 PM, JayeshLalwani < jayesh.lalw...@capitalone.com> wrote: > What is a good way to make a Structured Streaming application deal with bad > input? Right now, the problem is that bad input kills the Structured > Streaming application. This is highly undesirable, because a Structured > Streaming application has to be always on > > For example, here is a very simple structured streaming program > > > > > Now, I drop in a CSV file with the following data into my bucket > > > > Obviously the data is in the wrong format > > The executor and driver come crashing down > 17/02/23 08:53:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > 0) > java.lang.NumberFormatException: For input string: "Iron man" > at > java.lang.NumberFormatException.forInputString( > NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.parseInt(Integer.java:615) > at scala.collection.immutable.StringLike$class.toInt( > StringLike.scala:272) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) > at > org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo( > CSVInferSchema.scala:250) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:125) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:94) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1. > nextIterator(FileScanRDD.scala:166) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator. > hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. > scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/02/23 08:53:40 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, > localhost, executor driver): java.lang.NumberFormatException: For input > string: "Iron man" > at > java.lang.NumberFormatException.forInputString( > NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.parseInt(Integer.java:615) > at scala.collection.immutable.StringLike$class.toInt( > StringLike.scala:272) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) > at > org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo( > CSVInferSchema.scala:250) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:125) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:94) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1. > nextIterator(FileScanRDD.scala:166) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator. > hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. > scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 17/02/23 08:53:40 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > 17/02/23 08:53:40 ERROR StreamExecution: Query [id = > 2ea5adce-183a-4dc7-91f9-4c9aeecac440, runId = > 3768bd1f-1ecf-427e-8d5f-e64592678dbe] terminated with error > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in > stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost, executor driver): java.lang.NumberFormatException: For > input string: "Iron man" > at > java.lang.NumberFormatException.forInputString( > NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.parseInt(Integer.java:615) > at scala.collection.immutable.StringLike$class.toInt( > StringLike.scala:272) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) > at > org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo( > CSVInferSchema.scala:250) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:125) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:94) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1. > nextIterator(FileScanRDD.scala:166) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator. > hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. > scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ > scheduler$DAGScheduler$$failJobAndIndependentStages( > DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach( > ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage( > DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( > DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > doOnReceive(DAGScheduler.scala:1650) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob( > DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD. > scala:935) > at > org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.collect(RDD.scala:934) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan. > scala:275) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$ > Dataset$$execute$1$1.apply(Dataset.scala:2371) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId( > SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset. > scala:2765) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$ > execute$1(Dataset.scala:2370) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$ > Dataset$$collect$1.apply(Dataset.scala:2375) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$ > Dataset$$collect$1.apply(Dataset.scala:2375) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$ > collect(Dataset.scala:2375) > at org.apache.spark.sql.Dataset.collect(Dataset.scala:2351) > at > org.apache.spark.sql.execution.streaming.ConsoleSink.addBatch(console. > scala:49) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$ > apache$spark$sql$execution$streaming$StreamExecution$$ > runBatch$1.apply$mcV$sp(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$ > apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply( > StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$ > apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply( > StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class. > reportTimeTaken(ProgressReporter.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken( > StreamExecution.scala:46) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$ > spark$sql$execution$streaming$StreamExecution$$runBatch( > StreamExecution.scala:502) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$ > apache$spark$sql$execution$streaming$StreamExecution$$ > runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$ > apache$spark$sql$execution$streaming$StreamExecution$$ > runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$ > apache$spark$sql$execution$streaming$StreamExecution$$ > runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class. > reportTimeTaken(ProgressReporter.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken( > StreamExecution.scala:46) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$ > apache$spark$sql$execution$streaming$StreamExecution$$ > runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor. > execute(TriggerExecutor.scala:43) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$ > spark$sql$execution$streaming$StreamExecution$$runBatches( > StreamExecution.scala:239) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run( > StreamExecution.scala:177) > Caused by: java.lang.NumberFormatException: For input string: "Iron man" > at > java.lang.NumberFormatException.forInputString( > NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.parseInt(Integer.java:615) > at scala.collection.immutable.StringLike$class.toInt( > StringLike.scala:272) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) > at > org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo( > CSVInferSchema.scala:250) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:125) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:94) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1. > nextIterator(FileScanRDD.scala:166) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator. > hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. > scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > This is extremely undesirable because we cannot control what data comes > over > the wire. Yes, we can always restart the driver, but once the driver > restarts, the checkpointing mechanism makes it read the bad data and it > crashes again. Essentially, bad records become poison messages. The only > way > the system can recover is by human intervention > > Ideally, bad records should be written to some sort of dead letter queue > (or > I guess a dead letter dataframe), so the application can then do something > with it rather than crash > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Structured-Streaming-How-to- > handle-bad-input-tp28420.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >