Re: Reading Binary files in Spark program
If you can share the complete code and a sample file, may be i can try to reproduce it on my end. Thanks Best Regards On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma tapan.sha...@gmail.com wrote: Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at
Re: Reading Binary files in Spark program
I am not doing anything special. *Here is the code :* SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; // Following statements is giving exception final ListTuple2String, Byte tuple2s = seqFiles.toArray(); // Or final ListTuple2String, Byte tuple2s = seqFiles.collect(); *And this is how I have created a sequence file:* http://stuartsierra.com/2008/04/24/a-million-little-files Regards Tapan On Wed, May 20, 2015 at 12:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you can share the complete code and a sample file, may be i can try to reproduce it on my end. Thanks Best Regards On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma tapan.sha...@gmail.com wrote: Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at
Re: Reading Binary files in Spark program
Hi Basically, you need to convert it to a serializable format before doing the collect. You can fire up a spark shell and paste this: val sFile = sc.sequenceFile[LongWritable, Text](/home/akhld/sequence/sigmoid) *.map(_._2.toString)* sFile.take(5).foreach(println) Use the attached sequence file generator and generated sequence file that i used for testing. Also note:If you don't do the .map to convert to string, then it will end up with the serializable Exception that you are hitting. [image: Inline image 1] Thanks Best Regards On Wed, May 20, 2015 at 5:48 PM, Tapan Sharma tapan.sha...@gmail.com wrote: I am not doing anything special. *Here is the code :* SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; // Following statements is giving exception final ListTuple2String, Byte tuple2s = seqFiles.toArray(); // Or final ListTuple2String, Byte tuple2s = seqFiles.collect(); *And this is how I have created a sequence file:* http://stuartsierra.com/2008/04/24/a-million-little-files Regards Tapan On Wed, May 20, 2015 at 12:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you can share the complete code and a sample file, may be i can try to reproduce it on my end. Thanks Best Regards On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma tapan.sha...@gmail.com wrote: Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO
Re: Reading Binary files in Spark program
Thanks. I will try and let you know. But what exactly is an issue? Any pointers? Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456)
Re: Reading Binary files in Spark program
Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at
Re: Reading Binary files in Spark program
Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at