[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV
[ https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905564#comment-15905564 ] Wenchen Fan commented on SPARK-19885: - Oh, so this issue is already fixed by SPARK-18362 in Spark 2.2 Since it's not a critical issue and SPARK-18362 is an optimization, we should not backport it. Let's just resolve this ticket. > The config ignoreCorruptFiles doesn't work for CSV > -- > > Key: SPARK-19885 > URL: https://issues.apache.org/jira/browse/SPARK-19885 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu > Fix For: 2.2.0 > > > CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL > "ignoreCorruptFiles" doesn't work. > {code} > java.io.EOFException: Unexpected end of input stream > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) > at java.io.InputStream.read(InputStream.java:101) > at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) > at > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) > at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at >
[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV
[ https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904810#comment-15904810 ] Hyukjin Kwon commented on SPARK-19885: -- Also, we recently introduced reading a CSV from text dataset. I think there is a workaround even if we remove this. For example, this PR - https://github.com/apache/spark/pull/16854 illustrates an example that creates a dataset from text dataset that contains CSV rows by a custom input format. This encoding problem might be workaround via this. FYI, I opened and closed a PR before to support non-compatible encodings in CSV - https://github.com/apache/spark/pull/11016 about a year ago. Maybe, we could try this by manually setting the record delimiter if we decide to support this. IMHO, I don't think this is also correct way because I am not sure if {{TextInputFormat}} supports other encoding types correctly by above references. > The config ignoreCorruptFiles doesn't work for CSV > -- > > Key: SPARK-19885 > URL: https://issues.apache.org/jira/browse/SPARK-19885 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu > > CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL > "ignoreCorruptFiles" doesn't work. > {code} > java.io.EOFException: Unexpected end of input stream > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) > at java.io.InputStream.read(InputStream.java:101) > at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) > at > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) > at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at >
[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV
[ https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904747#comment-15904747 ] Hyukjin Kwon commented on SPARK-19885: -- Thank you for cc'ing me. Up to my knowledge, {{LineReader}} itself assumes the input is UTF-8 according to according to [MAPREDUCE-232|https://issues.apache.org/jira/browse/MAPREDUCE-232], it looks [{{TextInputFormat}}|https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java] does not guarantee all encoding types but officially only UTF-8 (as commented in [{{LineRecordReader#L147}}|https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L147]). So, it seems it only works if the encodings are compatible with this. IMHO, this is an incorrect implementation which we should remove. We should support this when actually we starts to read (not relying on {{TextInputFormat}}) but I guess this is a big job. BTW, I think we use {{FileScanRDD}} in schema inference only in CSV datasource when encoding is default in favour of SPARK-18362 and I guess this probably would not work for JSON too (let me maybe open a PR for JSON one soon). > The config ignoreCorruptFiles doesn't work for CSV > -- > > Key: SPARK-19885 > URL: https://issues.apache.org/jira/browse/SPARK-19885 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu > > CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL > "ignoreCorruptFiles" doesn't work. > {code} > java.io.EOFException: Unexpected end of input stream > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) > at java.io.InputStream.read(InputStream.java:101) > at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) > at > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) > at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >
[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV
[ https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904606#comment-15904606 ] Wenchen Fan commented on SPARK-19885: - This is because we support different charset for CSV files, and our text file format only supports UTF8, so we have to use `HadoopRDD` when infer schema for CSV data source. I've checked the history, this feature was there the first day we introduce CSV data source. However, all other text-based data source support only UTF8, also CSV with wholeFile enabled only supports UTF8. shall we just remove the support for different charsets? or support this feature for all text-based data source? cc [~hyukjin.kwon] > The config ignoreCorruptFiles doesn't work for CSV > -- > > Key: SPARK-19885 > URL: https://issues.apache.org/jira/browse/SPARK-19885 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu > > CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL > "ignoreCorruptFiles" doesn't work. > {code} > java.io.EOFException: Unexpected end of input stream > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) > at java.io.InputStream.read(InputStream.java:101) > at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) > at > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) > at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) > at >