Thanks for the info ! Shing
On Tuesday, 5 May 2015, 15:11, Imran Rashid <iras...@cloudera.com> wrote: You might be interested in https://issues.apache.org/jira/browse/SPARK-6593 and the discussion around the PRs. This is probably more complicated than what you are looking for, but you could copy the code for HadoopReliableRDD in the PR into your own code and use it, without having to wait for the issue to get resolved. On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man <mat...@yahoo.com.invalid> wrote: Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The avro files are from a third party and a few of them are corrupted. val path = "{myDirecotry of avro files}" val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val data = sqlContext.avroFile(path); data.select(.....) When I run the above code, I get the following exception. org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) ~[classes/:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) ~[spark-core_2.10-1.3.1.jar:1.3.1] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.1.jar:1.3.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) ~[classes/:1.7.7] at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) ~[classes/:1.7.7] ... 25 common frames omitted Is there an easy way to skip a corrupted avro file without reading the files one by one using sqlContext.avroFile(file) ? At present, my solution (hack) is to have my own version of org.apache.avro.file.DataFileStream with method hasNext returns false (to signal the end file), when java.io.IOException: Invalid sync! is thrown. Please see line 210 in https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Thanks in advance for any assistance ! Shing