Hi Jayesh

So you have 2 problems here

1) Data was loaded in the wrong format
2) Once you handled the wrong data the spark job will continually retry the
failed batch

For 2 its very easy to go into the checkpoint directory and delete that
offset manually and make it seem like it never happened.

However for point 1 the issue is a little bit more trickier, if you receive
bad data then perhaps your first point of call should be a cleaning process
to ensure your data is atleast parsable, then move it to another directory
which spark streaming is looking at

It is unreasonable to have spark both do the streaming and handle bad data
for you yet remain extremely simple and easy to use

That said I personally would have a conversation with the provider of the
data


In this scenario I just ensure that these providers ensure the format of
the data is correct, whether its CSV JSON AVRO PARQUET or whatever, I
should hope whatever service/company is providing this data is providing it
"correctly" to a set definition, otherwise you will have to do a pre
cleaning step


Perhaps someone else can suggest a better/cleaner approach

Regards
Sam







On Thu, Feb 23, 2017 at 2:09 PM, JayeshLalwani <
jayesh.lalw...@capitalone.com> wrote:

> What is a good way to make a Structured Streaming application deal with bad
> input? Right now, the problem is that bad input kills the Structured
> Streaming application. This is highly undesirable, because a Structured
> Streaming application has to be always on
>
> For example, here is a very simple structured streaming program
>
>
>
>
> Now, I drop in a CSV file with the following data into my bucket
>
>
>
> Obviously the data is in the wrong format
>
> The executor and driver come crashing down
> 17/02/23 08:53:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
> java.lang.NumberFormatException: For input string: "Iron man"
>         at
> java.lang.NumberFormatException.forInputString(
> NumberFormatException.java:65)
>         at java.lang.Integer.parseInt(Integer.java:580)
>         at java.lang.Integer.parseInt(Integer.java:615)
>         at scala.collection.immutable.StringLike$class.toInt(
> StringLike.scala:272)
>         at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(
> CSVInferSchema.scala:250)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:125)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:94)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:166)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
>         at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:231)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:225)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 17/02/23 08:53:40 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost, executor driver): java.lang.NumberFormatException: For input
> string: "Iron man"
>         at
> java.lang.NumberFormatException.forInputString(
> NumberFormatException.java:65)
>         at java.lang.Integer.parseInt(Integer.java:580)
>         at java.lang.Integer.parseInt(Integer.java:615)
>         at scala.collection.immutable.StringLike$class.toInt(
> StringLike.scala:272)
>         at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(
> CSVInferSchema.scala:250)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:125)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:94)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:166)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
>         at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:231)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:225)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> 17/02/23 08:53:40 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times;
> aborting job
> 17/02/23 08:53:40 ERROR StreamExecution: Query [id =
> 2ea5adce-183a-4dc7-91f9-4c9aeecac440, runId =
> 3768bd1f-1ecf-427e-8d5f-e64592678dbe] terminated with error
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0
> (TID 0, localhost, executor driver): java.lang.NumberFormatException: For
> input string: "Iron man"
>         at
> java.lang.NumberFormatException.forInputString(
> NumberFormatException.java:65)
>         at java.lang.Integer.parseInt(Integer.java:580)
>         at java.lang.Integer.parseInt(Integer.java:615)
>         at scala.collection.immutable.StringLike$class.toInt(
> StringLike.scala:272)
>         at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(
> CSVInferSchema.scala:250)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:125)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:94)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:166)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
>         at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:231)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:225)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1435)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1423)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1422)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:48)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1422)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>         at scala.Option.foreach(Option.scala:257)
>         at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:802)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1650)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1605)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1594)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>         at org.apache.spark.scheduler.DAGScheduler.runJob(
> DAGScheduler.scala:628)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
>         at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.
> scala:935)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>         at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.
> scala:275)
>         at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$
> Dataset$$execute$1$1.apply(Dataset.scala:2371)
>         at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:57)
>         at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.
> scala:2765)
>         at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$
> execute$1(Dataset.scala:2370)
>         at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$
> Dataset$$collect$1.apply(Dataset.scala:2375)
>         at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$
> Dataset$$collect$1.apply(Dataset.scala:2375)
>         at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778)
>         at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$
> collect(Dataset.scala:2375)
>         at org.apache.spark.sql.Dataset.collect(Dataset.scala:2351)
>         at
> org.apache.spark.sql.execution.streaming.ConsoleSink.addBatch(console.
> scala:49)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatch$1.apply$mcV$sp(StreamExecution.scala:503)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(
> StreamExecution.scala:503)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(
> StreamExecution.scala:503)
>         at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:262)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(
> StreamExecution.scala:46)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatch(
> StreamExecution.scala:502)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
>         at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:262)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(
> StreamExecution.scala:46)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
>         at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.
> execute(TriggerExecutor.scala:43)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatches(
> StreamExecution.scala:239)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(
> StreamExecution.scala:177)
> Caused by: java.lang.NumberFormatException: For input string: "Iron man"
>         at
> java.lang.NumberFormatException.forInputString(
> NumberFormatException.java:65)
>         at java.lang.Integer.parseInt(Integer.java:580)
>         at java.lang.Integer.parseInt(Integer.java:615)
>         at scala.collection.immutable.StringLike$class.toInt(
> StringLike.scala:272)
>         at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(
> CSVInferSchema.scala:250)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:125)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:94)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
>         at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:166)
>         at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
>         at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:231)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:225)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> This is extremely undesirable because we cannot control what data comes
> over
> the wire. Yes, we can always restart the driver, but once the driver
> restarts, the checkpointing mechanism makes it read the bad data and it
> crashes again. Essentially, bad records become poison messages. The only
> way
> the system can recover is by human intervention
>
> Ideally, bad records should be written to some sort of dead letter queue
> (or
> I guess a dead letter dataframe), so the application can then do something
> with it rather than crash
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Structured-Streaming-How-to-
> handle-bad-input-tp28420.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to