Re: Structured Streaming: How to handle bad input

2017-02-27 Thread sasubillis
I think it is users responsibility to validate the input before feeding. 

https://databricks.gitbooks.io/databricks-spark-knowledge-base/best_practices/dealing_with_bad_data.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-How-to-handle-bad-input-tp28420p28428.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Streaming: How to handle bad input

2017-02-23 Thread Sam Elamin
he.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:166)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:231)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:225)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> This is extremely undesirable because we cannot control what data comes
> over
> the wire. Yes, we can always restart the driver, but once the driver
> restarts, the checkpointing mechanism makes it read the bad data and it
> crashes again. Essentially, bad records become poison messages. The only
> way
> the system can recover is by human intervention
>
> Ideally, bad records should be written to some sort of dead letter queue
> (or
> I guess a dead letter dataframe), so the application can then do something
> with it rather than crash
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Structured-Streaming-How-to-
> handle-bad-input-tp28420.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Structured Streaming: How to handle bad input

2017-02-23 Thread JayeshLalwani
treamExecution.reportTimeTaken(StreamExecution.scala:46)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: java.lang.NumberFormatException: For input string: "Iron man"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at 
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
at
org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:250)
at
org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125)
at
org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94)
at
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
at
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

This is extremely undesirable because we cannot control what data comes over
the wire. Yes, we can always restart the driver, but once the driver
restarts, the checkpointing mechanism makes it read the bad data and it
crashes again. Essentially, bad records become poison messages. The only way
the system can recover is by human intervention

Ideally, bad records should be written to some sort of dead letter queue (or
I guess a dead letter dataframe), so the application can then do something
with it rather than crash



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-How-to-handle-bad-input-tp28420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org