Hi Deng. Thanks for the response.
Is it possible to load sequence files parallely and process each of it in parallel...? Regards Vinoth Sankar On Fri, Oct 30, 2015 at 2:56 PM Deng Ching-Mallete <och...@apache.org> wrote: > Hi, > > You seem to be creating a new RDD for each element in your files RDD. What > I would suggest is to load and process only one sequence file in your Spark > job, then just execute multiple spark jobs to process each sequence file. > > With regard to your question of where to view the logs inside the > closures, you should be able to see them in the executor log (via the Spark > UI, in the Executors page). > > HTH, > Deng > > > On Fri, Oct 30, 2015 at 1:20 PM, Vinoth Sankar <vinoth9...@gmail.com> > wrote: > >> Hi Adrian, >> >> Yes. I need to load all files and process it in parallel. Following code >> doesn't seem working(Here I used map, even tried foreach) ,I just >> downloading the files from HDFS to local system and printing the logs count >> in each file. Its not throwing any Exceptions,but not working. Files are >> not getting downloaded. I even didn't get that LOGGER print. Same code >> works if I iterate the files, but its not Parallelized. How do I get my >> code Parallelized and Working. >> >> JavaRDD<String> files = sparkContext.parallelize(fileList); >> >> files.map(new Function<String, Void>() >> { >> public static final long serialVersionUID = 1L; >> >> @Override >> public Void call(String hdfsPath) throws Exception >> { >> JavaPairRDD<IntWritable, BytesWritable> hdfsContent = >> sc.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class); >> JavaRDD<Message> logs = hdfsContent.map(new Function<Tuple2<IntWritable, >> BytesWritable>, Message>() >> { >> public Message call(Tuple2<IntWritable, BytesWritable> tuple2) throws >> Exception >> { >> BytesWritable value = tuple2._2(); >> BytesWritable tmp = new BytesWritable(); >> tmp.setCapacity(value.getLength()); >> tmp.set(value); >> return (Message) getProtos(1, tmp.getBytes()); >> } >> }); >> >> String path = "/home/sas/logsfilterapp/temp/" + hdfsPath.substring(26); >> >> Thread.sleep(2000); >> logs.saveAsObjectFile(path); >> >> LOGGER.log(Level.INFO, "HDFS Path : {0} Count : {1}", new Object[] { >> hdfsPath, logs.count() }); >> return null; >> } >> }); >> >> >> >> Note : In another scenario also i didn't get the logs which are present >> inside map,filter closures. But logs outside these closures are getting >> printed as usual. If i can't get the logger prints inside these closures >> how do i debug them ? >> >> Thanks >> Vinoth Sankar >> >> On Wed, Oct 28, 2015 at 8:29 PM Adrian Tanase <atan...@adobe.com> wrote: >> >>> The first line is distributing your fileList variable in the cluster as >>> a RDD, partitioned using the default partitioner settings (e.g. Number of >>> cores in your cluster). >>> >>> Each of your workers would one or more slices of data (depending on how >>> many cores each executor has) and the abstraction is called partition. >>> >>> What is your use case? If you want to load the files and continue >>> processing in parallel, then a simple .map should work. >>> If you want to execute arbitrary code based on the list of files that >>> each executor received, then you need to use .foreach that will get >>> executed for each of the entries, on the worker. >>> >>> -adrian >>> >>> From: Vinoth Sankar >>> Date: Wednesday, October 28, 2015 at 2:49 PM >>> To: "user@spark.apache.org" >>> Subject: How do I parallize Spark Jobs at Executor Level. >>> >>> Hi, >>> >>> I'm reading and filtering large no of files using Spark. It's getting >>> parallized at Spark Driver level only. How do i make it parallelize to >>> Executor(Worker) Level. Refer the following sample. Is there any way to >>> paralleling iterate the localIterator ? >>> >>> Note : I use Java 1.7 version >>> >>> JavaRDD<String> files = javaSparkContext.parallelize(fileList) >>> Iterator<String> localIterator = files.toLocalIterator(); >>> >>> Regards >>> Vinoth Sankar >>> >>