You might be interested in https://issues.apache.org/jira/browse/SPARK-6593
and the discussion around the PRs.

This is probably more complicated than what you are looking for, but you
could copy the code for HadoopReliableRDD in the PR into your own code and
use it, without having to wait for the issue to get resolved.

On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man <mat...@yahoo.com.invalid>
wrote:

>
> Hi,
>  I am using Spark 1.3.1 to read a directory of about 2000 avro files.
> The avro files are from a third party and a few of them are corrupted.
>
>   val path = "{myDirecotry of avro files}"
>
>  val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local")
>   val sc = new SparkContext(sparkConf)
>
>  val sqlContext = new SQLContext(sc)
>
>  val data = sqlContext.avroFile(path);
>  data.select(.....)
>
>  When I run the above code, I get the following exception.
>  org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync!
>  at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222)
> ~[classes/:1.7.7]
>  at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
> ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7]
>  at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
> ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7]
>  at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library.jar:na]
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library.jar:na]
>  at
> org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129)
> ~[spark-sql_2.10-1.3.1.jar:1.3.1]
>  at
> org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126)
> ~[spark-sql_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
>  at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> Caused by: java.io.IOException: Invalid sync!
>  at
> org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314)
> ~[classes/:1.7.7]
>  at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209)
> ~[classes/:1.7.7]
>  ... 25 common frames omitted
>
>    Is there an easy way to skip a corrupted avro file without reading the
> files one by one using sqlContext.avroFile(file) ?
>  At present, my solution (hack)  is to have my own version of
> org.apache.avro.file.DataFileStream with method hasNext returns false (
> to signal the end file), when
>  java.io.IOException: Invalid sync!
>   is thrown.
>   Please see  line 210 in
>
> https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
>
>   Thanks in advance for any assistance !
>   Shing
>
>

Reply via email to