Re: Structured Streaming: How to handle bad input
I think it is users responsibility to validate the input before feeding. https://databricks.gitbooks.io/databricks-spark-knowledge-base/best_practices/dealing_with_bad_data.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-How-to-handle-bad-input-tp28420p28428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Structured Streaming: How to handle bad input
he.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1. > nextIterator(FileScanRDD.scala:166) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator. > hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. > scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > This is extremely undesirable because we cannot control what data comes > over > the wire. Yes, we can always restart the driver, but once the driver > restarts, the checkpointing mechanism makes it read the bad data and it > crashes again. Essentially, bad records become poison messages. The only > way > the system can recover is by human intervention > > Ideally, bad records should be written to some sort of dead letter queue > (or > I guess a dead letter dataframe), so the application can then do something > with it rather than crash > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Structured-Streaming-How-to- > handle-bad-input-tp28420.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Structured Streaming: How to handle bad input
treamExecution.reportTimeTaken(StreamExecution.scala:46) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) Caused by: java.lang.NumberFormatException: For input string: "Iron man" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:250) at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125) at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94) at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167) at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This is extremely undesirable because we cannot control what data comes over the wire. Yes, we can always restart the driver, but once the driver restarts, the checkpointing mechanism makes it read the bad data and it crashes again. Essentially, bad records become poison messages. The only way the system can recover is by human intervention Ideally, bad records should be written to some sort of dead letter queue (or I guess a dead letter dataframe), so the application can then do something with it rather than crash -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-How-to-handle-bad-input-tp28420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org