[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV

2017-03-10 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905564#comment-15905564
 ] 

Wenchen Fan commented on SPARK-19885:
-

Oh, so this issue is already fixed by SPARK-18362 in Spark 2.2

Since it's not a critical issue and SPARK-18362 is an optimization, we should 
not backport it. Let's just resolve this ticket.

> The config ignoreCorruptFiles doesn't work for CSV
> --
>
> Key: SPARK-19885
> URL: https://issues.apache.org/jira/browse/SPARK-19885
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
> Fix For: 2.2.0
>
>
> CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL 
> "ignoreCorruptFiles" doesn't work.
> {code}
> java.io.EOFException: Unexpected end of input stream
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
>   at java.io.InputStream.read(InputStream.java:101)
>   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>   at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
>   at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> 

[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV

2017-03-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904810#comment-15904810
 ] 

Hyukjin Kwon commented on SPARK-19885:
--

Also, we recently introduced reading a CSV from text dataset. I think there is 
a workaround even if we remove this. For example, this PR - 
https://github.com/apache/spark/pull/16854 illustrates an example that creates 
a dataset from text dataset that contains CSV rows by a custom input format. 
This encoding problem might be workaround via this.

FYI, I opened and closed a PR before to support non-compatible encodings in CSV 
- https://github.com/apache/spark/pull/11016 about a year ago. Maybe, we could 
try this by manually setting the record delimiter if we decide to support this. 

IMHO, I don't think this is also correct way because I am not sure if 
{{TextInputFormat}} supports other encoding types correctly by above references.

> The config ignoreCorruptFiles doesn't work for CSV
> --
>
> Key: SPARK-19885
> URL: https://issues.apache.org/jira/browse/SPARK-19885
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL 
> "ignoreCorruptFiles" doesn't work.
> {code}
> java.io.EOFException: Unexpected end of input stream
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
>   at java.io.InputStream.read(InputStream.java:101)
>   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>   at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
>   at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> 

[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV

2017-03-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904747#comment-15904747
 ] 

Hyukjin Kwon commented on SPARK-19885:
--

Thank you for cc'ing me. Up to my knowledge, {{LineReader}} itself assumes the 
input is UTF-8 according to according to 
[MAPREDUCE-232|https://issues.apache.org/jira/browse/MAPREDUCE-232], it looks 
[{{TextInputFormat}}|https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java]
 does not guarantee all encoding types but officially only UTF-8 (as commented 
in 
[{{LineRecordReader#L147}}|https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L147]).

So, it seems it only works if the encodings are compatible with this. IMHO, 
this is an incorrect implementation which we should remove. We should support 
this when actually we starts to read (not relying on {{TextInputFormat}}) but I 
guess this is a big job.

BTW, I think we use {{FileScanRDD}} in schema inference only in CSV datasource 
when encoding is default in favour of SPARK-18362 and I guess this probably 
would not work for JSON too (let me maybe open a PR for JSON one soon).


> The config ignoreCorruptFiles doesn't work for CSV
> --
>
> Key: SPARK-19885
> URL: https://issues.apache.org/jira/browse/SPARK-19885
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL 
> "ignoreCorruptFiles" doesn't work.
> {code}
> java.io.EOFException: Unexpected end of input stream
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
>   at java.io.InputStream.read(InputStream.java:101)
>   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>   at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
>   at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   

[jira] [Commented] (SPARK-19885) The config ignoreCorruptFiles doesn't work for CSV

2017-03-09 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904606#comment-15904606
 ] 

Wenchen Fan commented on SPARK-19885:
-

This is because we support different charset for CSV files, and our text file 
format only supports UTF8, so we have to use `HadoopRDD` when infer schema for 
CSV data source.

I've checked the history, this feature was there the first day we introduce CSV 
data source. However, all other text-based data source support only UTF8, also 
CSV with wholeFile enabled only supports UTF8.

shall we just remove the support for different charsets? or support this 
feature for all text-based data source?

cc [~hyukjin.kwon]

> The config ignoreCorruptFiles doesn't work for CSV
> --
>
> Key: SPARK-19885
> URL: https://issues.apache.org/jira/browse/SPARK-19885
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> CSVFileFormat.inferSchema doesn't use FileScanRDD so the SQL 
> "ignoreCorruptFiles" doesn't work.
> {code}
> java.io.EOFException: Unexpected end of input stream
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
>   at java.io.InputStream.read(InputStream.java:101)
>   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>   at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
>   at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1112)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at 
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>   at 
>