Thanks for the info !
Shing
 

     On Tuesday, 5 May 2015, 15:11, Imran Rashid <iras...@cloudera.com> wrote:
   

 You might be interested in https://issues.apache.org/jira/browse/SPARK-6593 
and the discussion around the PRs.
This is probably more complicated than what you are looking for, but you could 
copy the code for HadoopReliableRDD in the PR into your own code and use it, 
without having to wait for the issue to get resolved.
On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man <mat...@yahoo.com.invalid> 
wrote:


Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The 
avro files are from a third party and a few of them are corrupted.
  val path = "{myDirecotry of avro files}"
 val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local")  val 
sc = new SparkContext(sparkConf)
 val sqlContext = new SQLContext(sc)
 val data = sqlContext.avroFile(path); data.select(.....)
 When I run the above code, I get the following exception.  
org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at 
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) 
~[classes/:1.7.7] at 
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) 
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at 
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) 
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at 
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na] at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129)
 ~[spark-sql_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126)
 ~[spark-sql_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.scheduler.Task.run(Task.scala:64) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_71] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: 
java.io.IOException: Invalid sync! at 
org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) 
~[classes/:1.7.7] at 
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) 
~[classes/:1.7.7] ... 25 common frames omitted
   Is there an easy way to skip a corrupted avro file without reading the files 
one by one using sqlContext.avroFile(file) ? At present, my solution (hack)  is 
to have my own version of org.apache.avro.file.DataFileStream with method 
hasNext returns false (to signal the end file), when  java.io.IOException: 
Invalid sync!   is thrown.  Please see  line 210 in  
https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
  Thanks in advance for any assistance !  Shing 




  

Reply via email to