Hi Basically, you need to convert it to a serializable format before doing the collect.
You can fire up a spark shell and paste this: val sFile = sc.sequenceFile[LongWritable, > Text]("/home/akhld/sequence/sigmoid") > *.map(_._2.toString)* > sFile.take(5).foreach(println) Use the attached sequence file generator and generated sequence file that i used for testing. Also note:If you don't do the .map to convert to string, then it will end up with the serializable Exception that you are hitting. [image: Inline image 1] Thanks Best Regards On Wed, May 20, 2015 at 5:48 PM, Tapan Sharma <tapan.sha...@gmail.com> wrote: > I am not doing anything special. > > > *Here is the code :* > > > SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile"); > JavaSparkContext ctx = new JavaSparkContext(sparkConf); > JavaPairRDD<String, Byte> seqFiles = ctx.sequenceFile(args[0], String.class, > Byte.class) ; > > // Following statements is giving exception > > final List<Tuple2<String, Byte>> tuple2s = seqFiles.toArray(); > > // Or > > final List<Tuple2<String, Byte>> tuple2s = seqFiles.collect(); > > > *And this is how I have created a sequence file:* > > http://stuartsierra.com/2008/04/24/a-million-little-files > > > Regards > > Tapan > > > > On Wed, May 20, 2015 at 12:42 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> If you can share the complete code and a sample file, may be i can try to >> reproduce it on my end. >> >> Thanks >> Best Regards >> >> On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma <tapan.sha...@gmail.com> >> wrote: >> >>> Problem is still there. >>> Exception is not coming at the time of reading. >>> Also the count of JavaPairRDD is as expected. It is when we are calling >>> collect() or toArray() methods, the exception is coming. >>> Something to do with Text class even though I haven't used it in the >>> program. >>> >>> Regards >>> Tapan >>> >>> On Tue, May 19, 2015 at 6:26 PM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Try something like: >>>> >>>> JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(inputDir, >>>> >>>> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, >>>> IntWritable.class, >>>> Text.class, new Job().getConfiguration()); >>>> >>>> With the type of input format that you require. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma <tapan.sha...@gmail.com> >>>> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> I am new to Spark and learning. >>>>> I am trying to read image files into spark job. This is how I am doing: >>>>> Step 1. Created sequence files with FileName as Key and Binary image as >>>>> value. i.e. Text and BytesWritable. >>>>> I am able to read these sequence files into Map Reduce programs. >>>>> >>>>> Step 2. >>>>> I understand that Text and BytesWritable are Non Serializable >>>>> therefore, I >>>>> read the sequence file in Spark as following: >>>>> >>>>> SparkConf sparkConf = new >>>>> SparkConf().setAppName("JavaSequenceFile"); >>>>> JavaSparkContext ctx = new JavaSparkContext(sparkConf); >>>>> JavaPairRDD<String, Byte> seqFiles = ctx.sequenceFile(args[0], >>>>> String.class, Byte.class) ; >>>>> final List<Tuple2<String, Byte>> tuple2s = seqFiles.collect(); >>>>> >>>>> >>>>> >>>>> >>>>> The moment I try to call collect() method to get the keys of sequence >>>>> file, >>>>> following exception has been thrown >>>>> >>>>> Can any one help me understanding why collect() method is failing? If >>>>> I use >>>>> toArray() on seqFiles object then also I am getting same call stack. >>>>> >>>>> Regards >>>>> Tapan >>>>> >>>>> >>>>> >>>>> java.io.NotSerializableException: org.apache.hadoop.io.Text >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >>>>> at >>>>> >>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>> at >>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>> at >>>>> >>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>> at >>>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>>>> at >>>>> >>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) >>>>> at >>>>> >>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) >>>>> at >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] >>>>> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in >>>>> stage >>>>> 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; >>>>> not >>>>> retrying >>>>> 2015-05-19 15:15:03,731 INFO [task-result-getter-0] >>>>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed >>>>> TaskSet >>>>> 0.0, whose tasks have all completed, from pool >>>>> 2015-05-19 15:15:03,739 INFO >>>>> [sparkDriver-akka.actor.default-dispatcher-2] >>>>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling >>>>> stage 0 >>>>> 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler >>>>> (Logging.scala:logInfo(59)) - Job 0 failed: collect at >>>>> JavaSequenceFile.java:44, took 4.421397 s >>>>> Exception in thread "main" org.apache.spark.SparkException: Job >>>>> aborted due >>>>> to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable >>>>> result: org.apache.hadoop.io.Text >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) >>>>> at >>>>> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) >>>>> at >>>>> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) >>>>> at >>>>> >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> at >>>>> >>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) >>>>> at >>>>> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>>> at >>>>> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>>> at scala.Option.foreach(Option.scala:236) >>>>> at >>>>> >>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) >>>>> at >>>>> >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>> at >>>>> >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Binary-files-in-Spark-program-tp22942.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>> >> >
.sigmoid.crc
Description: Binary data
sigmoid
Description: Binary data
/** * Created by akhld on 20/5/15. */ import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; //White, Tom (2012-05-10). Hadoop: The Definitive Guide (Kindle Locations 5375-5384). OReilly Media - A. Kindle Edition. public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main( String[] args) throws IOException { String uri = "/home/akhld/sequence/sigmoid"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create( uri), conf); Path path = new Path( uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i ++) { key.set( 100 - i); value.set( DATA[ i % DATA.length]); //System.out.printf("[% s]\t% s\t% s\n", writer.getLength(), key, value); writer.append( key, value); } } finally { IOUtils.closeStream( writer); } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org