Re: Issue in spark job. Remote rpc client dissociated
Hello, The variable argsList is an array defined above the parallel block. This variawis accessed inside the map function. Launcher.main is not threadsafe. Is is not possible to specify to spark that every folder needs to be processed as a separate process in a separate working directory? Regards Bala On 14-Jul-2016 2:37 pm, "Sun Rui" wrote: > Where is argsList defined? is Launcher.main() thread-safe? Note that if > multiple folders are processed in a node, multiple threads may concurrently > run in the executor, each processing a folder. > > On Jul 14, 2016, at 12:28, Balachandar R.A. > wrote: > > Hello Ted, >> > > > Thanks for the response. Here is the additional information. > > >> I am using spark 1.6.1 (spark-1.6.1-bin-hadoop2.6) >> >> Here is the code snippet >> >> >> JavaRDD add = jsc.parallelize(listFolders, listFolders.size()); >> JavaRDD test = add.map(new Function() >> { >> @Override >> public Integer call(File file) throws Exception { >> String folder = file.getName(); >> System.out.println("[x] Processing dataset from the >> directory " + folder); >> int status = 0; >>argsList[3] = argsList[3] + "/"+ folder; // full >> path of the input folder. Input folder is in shared file system that every >> worker node has access to it. Something like (“/home/user/software/data/”) >> and folder name will be like (“20161307”) >> argsList[7] = argsList[7] + "/" + folder + ".csv"; // >> full path of the output. >> try{ >> Launcher.main(argsList); // Launcher class is a >> black box. It process the input folder and create a csv file which in the >> output location (argsList[7]). This is also in a shared file system >> status = 0; >> } >> catch(Exception e) >> { >> System.out.println("[x] Execution of import tool >> for the directory " + folder + "failed"); >> status = 0; >> } >> accum.add(1); >> return status; >> } >> }); >> >> >> Here is the spark-env.sh >> >> export SPARK_WORKER_INSTANCES=1 >> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/ >> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop >> >> Here is the spark-defaults.conf >> >> >> spark.master spark:// master:7077 >> spark.eventLog.enabled true >> spark.eventLog.dir hdfs://master:9000/sparkEvent >> spark.serializer >> org.apache.spark.serializer.KryoSerializer >> spark.driver.memory 4g >> >> > > > Hope it helps. > > >
Re: Issue in spark job. Remote rpc client dissociated
> > Hello Ted, > Thanks for the response. Here is the additional information. > I am using spark 1.6.1 (spark-1.6.1-bin-hadoop2.6) > > > > Here is the code snippet > > > > > > JavaRDD add = jsc.parallelize(listFolders, listFolders.size()); > > JavaRDD test = add.map(new Function() { > > @Override > > public Integer call(File file) throws Exception { > > String folder = file.getName(); > > System.out.println("[x] Processing dataset from the > directory " + folder); > > int status = 0; > >argsList[3] = argsList[3] + "/"+ folder; // full path > of the input folder. Input folder is in shared file system that every > worker node has access to it. Something like (“/home/user/software/data/”) > and folder name will be like (“20161307”) > > argsList[7] = argsList[7] + "/" + folder + ".csv"; // > full path of the output. > > try{ > > Launcher.main(argsList); // Launcher class is a > black box. It process the input folder and create a csv file which in the > output location (argsList[7]). This is also in a shared file system > > status = 0; > > } > > catch(Exception e) > > { > > System.out.println("[x] Execution of import tool > for the directory " + folder + "failed"); > > status = 0; > > } > > accum.add(1); > > return status; > > } > > }); > > > > > > Here is the spark-env.sh > > > > export SPARK_WORKER_INSTANCES=1 > > export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/ > > export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop > > > > Here is the spark-defaults.conf > > > > > > spark.master spark:// master:7077 > > spark.eventLog.enabled true > > spark.eventLog.dir hdfs://master:9000/sparkEvent > > spark.serializer > org.apache.spark.serializer.KryoSerializer > > spark.driver.memory 4g > > > Hope it helps.
Issue in spark job. Remote rpc client dissociated
Hello In one of my use cases, i need to process list of folders in parallel. I used Sc.parallelize (list,list.size).map(" logic to process the folder"). I have a six node cluster and there are six folders to process. Ideally i expect that each of my node process one folder. But, i see that a node process multiple folders while one or two of the nodes do not get any job. In the end, the spark- submit crashes with the exception saying "remote RPC client dissociated". Can someone give me a hint on what's going wrong here? Please note that this issue does not arise if i comment my logic that processes the folder but simply print folder name. In this case, every node gets one folder to process. I inserted a sleep of 40 seconds inside the map. No issue. But when i uncomment my logic i see this issue. Also, before crashing it does process some of the folders successfully. Successfully means the business logic generates a file in a shared file system. Regards Bala
Spark job state is EXITED but does not return
Hello, I have one apache spark based simple use case that process two datasets. Each dataset takes about 5-7 min to process. I am doing this processing inside the sc.parallelize(datasets){ } block. While the first dataset is processed successfully, the processing of dataset is not started by spark. The application state is RUNNING but in executor summary, I notice that the state here is EXITED. Can someone tell me where things are going wrong? Regards Bala
Re: One map per folder in spark or Hadoop
Hi Thanks for the code snippet. If the executable inside the map process needs to access directories and files present in the local file system. Is it possible? I know they are running in slave node in a temporary working directory and i can think about distributed cache. But still would like to know if the map process can access local file system Regards Bala On 01-Jul-2016 7:46 am, "Sun Rui" wrote: > Say you have got all of your folder paths into a val folders: Seq[String] > > val add = sc.parallelize(folders, folders.size).mapPartitions { iter => > val folder = iter.next > val status: Int = > Seq(status).toIterator > } > > On Jun 30, 2016, at 16:42, Balachandar R.A. > wrote: > > Hello, > > I have some 100 folders. Each folder contains 5 files. I have an > executable that process one folder. The executable is a black box and hence > it cannot be modified.I would like to process 100 folders in parallel using > Apache spark so that I should be able to span a map task per folder. Can > anyone give me an idea? I have came across similar questions but with > Hadoop and answer was to use combineFileInputFormat and pathFilter. > However, as I said, I want to use Apache spark. Any idea? > > Regards > Bala > > >
Re: One map per folder in spark or Hadoop
Thank you very much. I will try this code and update you Regards Bala On 01-Jul-2016 7:46 am, "Sun Rui" wrote: > Say you have got all of your folder paths into a val folders: Seq[String] > > val add = sc.parallelize(folders, folders.size).mapPartitions { iter => > val folder = iter.next > val status: Int = > Seq(status).toIterator > } > > On Jun 30, 2016, at 16:42, Balachandar R.A. > wrote: > > Hello, > > I have some 100 folders. Each folder contains 5 files. I have an > executable that process one folder. The executable is a black box and hence > it cannot be modified.I would like to process 100 folders in parallel using > Apache spark so that I should be able to span a map task per folder. Can > anyone give me an idea? I have came across similar questions but with > Hadoop and answer was to use combineFileInputFormat and pathFilter. > However, as I said, I want to use Apache spark. Any idea? > > Regards > Bala > > >
One map per folder in spark or Hadoop
Hello, I have some 100 folders. Each folder contains 5 files. I have an executable that process one folder. The executable is a black box and hence it cannot be modified.I would like to process 100 folders in parallel using Apache spark so that I should be able to span a map task per folder. Can anyone give me an idea? I have came across similar questions but with Hadoop and answer was to use combineFileInputFormat and pathFilter. However, as I said, I want to use Apache spark. Any idea? Regards Bala
How to calculate weighted degrees in GraphX
I am new to GraphX and exploring example flight data analysis found on online. http://www.sparktutorials.net/analyzing-flight-data:-a-gentle-introduction-to-graphx-in-spark I tried calculating inDegrees (understand how many incoming flights to an airport) but I see value which corresponds to unique routes incoming to the airport. This means that if there exist more than one flight between a particular source and destination, lets' say 5, I am seeing only 1 for this route. What I want is something similar in the line of weighted degrees where in while calculating indegrees, the weight associated with the edge should be considered. This weight value is in edge._attr parameter which is not being considered for the computing degrees. Here is the my code block val inDegrees = graph.inDegrees.join(airportVertices).sortBy(_._2._1, ascending=false).take(10) val outDegrees = graph.outDegrees.join(airportVertices).sortBy(_._2._1, ascending=false).take(10) // Top 10 Indegrees println("Top 10 indegrees - > " ) inDegrees.foreach{println} // Top 10 out degrees println("Top 10 outDegrees -> " ) outDegrees.foreach{println} Can someone point to me right link or provide me an hint to solve this? regards Bala
Re: GraphX can show graph?
Thanks... Will look into that - Bala On 28 January 2016 at 15:36, Sahil Sareen wrote: > Try Neo4j for visualization, GraphX does a pretty god job at distributed > graph processing. > > On Thu, Jan 28, 2016 at 12:42 PM, Balachandar R.A. < > balachandar...@gmail.com> wrote: > >> Hi >> >> I am new to GraphX. I have a simple csv file which I could load and >> compute few graph statistics. However, I am not sure whether it is possible >> to create ad show graph (for visualization purpose) using GraphX. Any >> pointer to tutorial or information connected to this will be really helpful >> >> Thanks and regards >> Bala >> > >
GraphX can show graph?
Hi I am new to GraphX. I have a simple csv file which I could load and compute few graph statistics. However, I am not sure whether it is possible to create ad show graph (for visualization purpose) using GraphX. Any pointer to tutorial or information connected to this will be really helpful Thanks and regards Bala
spark-shell issue Job in illegal state & sparkcontext not serializable
Hello users, In one of my usecases, I need to launch a spark job from spark-shell. My input file is in HDFS and I am using NewHadoopRDD to construct RDD out of this input file as it uses custom input format. val hConf = sc.hadoopConfiguration var job = new Job(hConf) FileInputFormat.setInputPaths(job,new Path(path)); var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable], job.getConfiguration() ) However, when I run this commands, epsecially the 2nd line, in spark-shell, I get the below exception java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING So, I put these lines inside a method in another class. In my driver code, I am calling the method. // obtain RDD for the input file in HDFS var hRDD = cas.getRDD("/user/bala/RDF_STORE/MERGED_DAR.DAT_256") The above line construct Hadoop RDD and return the handle to the driver code. This works fine now. This means that I could work around illegalstateException issue. However, when I run the below command // runs the map and collect the results from distributed machines val result = hRDD.mapPartitionsWithInputSplit{ (split, iter) => cas.extractCAS(split, iter)}.collect() I get an error "Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext" Can someone help how can I work around this problem?? This code works perfectly when I use spark-submit though but I cannot use this. My ultimate idea is to run the driver code from zeppelin and hence i am testing this code in spark-shell with regards Bala
Re: converting categorical values in csv file to numerical values
Hi Guillaume, This is always an option. However, I read about HashingTF which exactly does this quite efficiently and can scale too. Hence, looking for a solution using this technique. regards Bala On 5 November 2015 at 18:50, tog wrote: > Hi Bala > > Can't you do a simple dictionnary and map those values to numbers? > > Cheers > Guillaume > > On 5 November 2015 at 09:54, Balachandar R.A. > wrote: > >> HI >> >> >> I am new to spark MLlib and machine learning. I have a csv file that >> consists of around 100 thousand rows and 20 columns. Of these 20 columns, >> 10 contains string values. Each value in these columns are not necessarily >> unique. They are kind of categorical, that is, the values could be one >> amount, say 10 values. To start with, I could run examples, especially, >> random forest algorithm in my local spark (1.5.1.) platform. However, I >> have a challenge with my dataset due to these strings as the APIs takes >> numerical values. Can any one tell me how I can map these categorical >> values (strings) into numbers and use them with random forest algorithms? >> Any example will be greatly appreciated. >> >> >> regards >> >> Bala >> > > > > -- > PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net >
converting categorical values in csv file to numerical values
HI I am new to spark MLlib and machine learning. I have a csv file that consists of around 100 thousand rows and 20 columns. Of these 20 columns, 10 contains string values. Each value in these columns are not necessarily unique. They are kind of categorical, that is, the values could be one amount, say 10 values. To start with, I could run examples, especially, random forest algorithm in my local spark (1.5.1.) platform. However, I have a challenge with my dataset due to these strings as the APIs takes numerical values. Can any one tell me how I can map these categorical values (strings) into numbers and use them with random forest algorithms? Any example will be greatly appreciated. regards Bala
Re: Error : - No filesystem for scheme: spark
I made a stupid mistake it seems. I supplied the --master option to the spark url in my launch command. And this error is gone. Thanks for pointing out possible places for troubleshooting Regards Bala On 02-Nov-2015 3:15 pm, "Balachandar R.A." wrote: > No.. I am not using yarn. Yarn is not running in my cluster. So, it is > standalone one. > > Regards > Bala > On 02-Nov-2015 3:11 pm, "Jean-Baptiste Onofré" wrote: > >> Just to be sure: you use yarn cluster (not standalone), right ? >> >> Regards >> JB >> >> On 11/02/2015 10:37 AM, Balachandar R.A. wrote: >> >>> Yes. In two different places I use spark:// >>> >>> 1. In my code, while creating spark configuration, I use the code below >>> >>> val sConf = new >>> SparkConf().setAppName("Dummy").setMaster("spark://:7077") >>> val sConf = val sc = new SparkContext(sConf) >>> >>> >>> 2. I run the job using the command below >>> >>> spark-submit --class org.myjob --jars myjob.jar spark://:7077 >>> myjob.jar >>> >>> regards >>> Bala >>> >>> >>> On 2 November 2015 at 14:59, Romi Kuntsman >> <mailto:r...@totango.com>> wrote: >>> >>> except "spark.master", do you have "spark://" anywhere in your code >>> or config files? >>> >>> *Romi Kuntsman*, /Big Data Engineer/_ >>> _ >>> http://www.totango.com <http://www.totango.com/> >>> >>> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. >>> mailto:balachandar...@gmail.com>> wrote: >>> >>> >>> -- Forwarded message -- >>> From: "Balachandar R.A." >> <mailto:balachandar...@gmail.com>> >>> Date: 02-Nov-2015 12:53 pm >>> Subject: Re: Error : - No filesystem for scheme: spark >>> To: "Jean-Baptiste Onofré" >> <mailto:j...@nanthrax.net>> >>> Cc: >>> >>> > HI JB, >>> > Thanks for the response, >>> > Here is the content of my spark-defaults.conf >>> > >>> > >>> > # Default system properties included when running >>> spark-submit. >>> > # This is useful for setting default environmental settings. >>> > >>> > # Example: >>> > spark.master spark://fdoat:7077 >>> > # spark.eventLog.enabled true >>> > spark.eventLog.dir/home/bala/spark-logs >>> > # spark.eventLog.dir >>> hdfs://namenode:8021/directory >>> > # spark.serializer >>> org.apache.spark.serializer.KryoSerializer >>> > # spark.driver.memory 5g >>> > # spark.executor.extraJavaOptions -XX:+PrintGCDetails >>> -Dkey=value -Dnumbers="one two three" >>> > >>> > >>> > regards >>> > Bala >>> >>> >>> > >>> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré >>> mailto:j...@nanthrax.net>> wrote: >>> >> >>> >> Hi, >>> >> >>> >> do you have something special in conf/spark-defaults.conf >>> (especially on the eventLog directory) ? >>> >> >>> >> Regards >>> >> JB >>> >> >>> >> >>> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote: >>> >>> >>> >>> Can someone tell me at what point this error could come? >>> >>> >>> >>> In one of my use cases, I am trying to use hadoop custom >>> input format. >>> >>> Here is my code. >>> >>> >>> >>> |valhConf:Configuration=sc.hadoopConfiguration >>> >>> >>> >>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].ge
Re: Error : - No filesystem for scheme: spark
No.. I am not using yarn. Yarn is not running in my cluster. So, it is standalone one. Regards Bala On 02-Nov-2015 3:11 pm, "Jean-Baptiste Onofré" wrote: > Just to be sure: you use yarn cluster (not standalone), right ? > > Regards > JB > > On 11/02/2015 10:37 AM, Balachandar R.A. wrote: > >> Yes. In two different places I use spark:// >> >> 1. In my code, while creating spark configuration, I use the code below >> >> val sConf = new >> SparkConf().setAppName("Dummy").setMaster("spark://:7077") >> val sConf = val sc = new SparkContext(sConf) >> >> >> 2. I run the job using the command below >> >> spark-submit --class org.myjob --jars myjob.jar spark://:7077 >> myjob.jar >> >> regards >> Bala >> >> >> On 2 November 2015 at 14:59, Romi Kuntsman > <mailto:r...@totango.com>> wrote: >> >> except "spark.master", do you have "spark://" anywhere in your code >> or config files? >> >> *Romi Kuntsman*, /Big Data Engineer/_ >> _ >> http://www.totango.com <http://www.totango.com/> >> >> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. >> mailto:balachandar...@gmail.com>> wrote: >> >> >> -- Forwarded message -- >> From: "Balachandar R.A." > <mailto:balachandar...@gmail.com>> >> Date: 02-Nov-2015 12:53 pm >> Subject: Re: Error : - No filesystem for scheme: spark >> To: "Jean-Baptiste Onofré" > <mailto:j...@nanthrax.net>> >> Cc: >> >> > HI JB, >> > Thanks for the response, >> > Here is the content of my spark-defaults.conf >> > >> > >> > # Default system properties included when running spark-submit. >> > # This is useful for setting default environmental settings. >> > >> > # Example: >> > spark.master spark://fdoat:7077 >> > # spark.eventLog.enabled true >> > spark.eventLog.dir/home/bala/spark-logs >> > # spark.eventLog.dir >> hdfs://namenode:8021/directory >> > # spark.serializer >> org.apache.spark.serializer.KryoSerializer >> > # spark.driver.memory 5g >> > # spark.executor.extraJavaOptions -XX:+PrintGCDetails >> -Dkey=value -Dnumbers="one two three" >> > >> > >> > regards >> > Bala >> >> >> > >> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré >> mailto:j...@nanthrax.net>> wrote: >> >> >> >> Hi, >> >> >> >> do you have something special in conf/spark-defaults.conf >> (especially on the eventLog directory) ? >> >> >> >> Regards >> >> JB >> >> >> >> >> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote: >> >>> >> >>> Can someone tell me at what point this error could come? >> >>> >> >>> In one of my use cases, I am trying to use hadoop custom >> input format. >> >>> Here is my code. >> >>> >> >>> |valhConf:Configuration=sc.hadoopConfiguration >> >>> >> >> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob >> >>> >> >> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD >> >>> >> >> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount >> >>> >> >> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}| >> >>> >> >>> |The moment I invoke mapPartitionsWithInputSplit() method, >> I get the >> >
Re: Error : - No filesystem for scheme: spark
Yes. In two different places I use spark:// 1. In my code, while creating spark configuration, I use the code below val sConf = new SparkConf().setAppName("Dummy").setMaster("spark://:7077") val sConf = val sc = new SparkContext(sConf) 2. I run the job using the command below spark-submit --class org.myjob --jars myjob.jar spark://:7077 myjob.jar regards Bala On 2 November 2015 at 14:59, Romi Kuntsman wrote: > except "spark.master", do you have "spark://" anywhere in your code or > config files? > > *Romi Kuntsman*, *Big Data Engineer* > http://www.totango.com > > On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. < > balachandar...@gmail.com> wrote: > >> >> -- Forwarded message -- >> From: "Balachandar R.A." >> Date: 02-Nov-2015 12:53 pm >> Subject: Re: Error : - No filesystem for scheme: spark >> To: "Jean-Baptiste Onofré" >> Cc: >> >> > HI JB, >> > Thanks for the response, >> > Here is the content of my spark-defaults.conf >> > >> > >> > # Default system properties included when running spark-submit. >> > # This is useful for setting default environmental settings. >> > >> > # Example: >> > spark.master spark://fdoat:7077 >> > # spark.eventLog.enabled true >> > spark.eventLog.dir/home/bala/spark-logs >> > # spark.eventLog.dir hdfs://namenode:8021/directory >> > # spark.serializer >> org.apache.spark.serializer.KryoSerializer >> > # spark.driver.memory 5g >> > # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value >> -Dnumbers="one two three" >> > >> > >> > regards >> > Bala >> >> > >> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré >> wrote: >> >> >> >> Hi, >> >> >> >> do you have something special in conf/spark-defaults.conf (especially >> on the eventLog directory) ? >> >> >> >> Regards >> >> JB >> >> >> >> >> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote: >> >>> >> >>> Can someone tell me at what point this error could come? >> >>> >> >>> In one of my use cases, I am trying to use hadoop custom input format. >> >>> Here is my code. >> >>> >> >>> |valhConf:Configuration=sc.hadoopConfiguration >> >>> >> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob >> >>> >> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD >> >>> >> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount >> >>> >> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}| >> >>> >> >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the >> >>> below error in my spark-submit launch| >> >>> >> >>> | >> >>> | >> >>> >> >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage >> >>> 0.0(TID >> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark >> >>> at >> >>> >> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at >> >>> >> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at >> >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)| >> >>> >> >>> Any help here to move towards fixing this will be of great help >> >>> >> >>> >> >>> >> >>> Thanks >> >>> >> >>> Bala >> >>> >> >> >> >> -- >> >> Jean-Baptiste Onofré >> >> jbono...@apache.org >> >> http://blog.nanthrax.net >> >> Talend - http://www.talend.com >> >> >> >> - >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > >> >> >
Re: Error : - No filesystem for scheme: spark
-- Forwarded message -- From: "Balachandar R.A." Date: 02-Nov-2015 12:53 pm Subject: Re: Error : - No filesystem for scheme: spark To: "Jean-Baptiste Onofré" Cc: > HI JB, > Thanks for the response, > Here is the content of my spark-defaults.conf > > > # Default system properties included when running spark-submit. > # This is useful for setting default environmental settings. > > # Example: > spark.master spark://fdoat:7077 > # spark.eventLog.enabled true > spark.eventLog.dir/home/bala/spark-logs > # spark.eventLog.dir hdfs://namenode:8021/directory > # spark.serializer org.apache.spark.serializer.KryoSerializer > # spark.driver.memory 5g > # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three" > > > regards > Bala > > On 2 November 2015 at 12:21, Jean-Baptiste Onofré wrote: >> >> Hi, >> >> do you have something special in conf/spark-defaults.conf (especially on the eventLog directory) ? >> >> Regards >> JB >> >> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote: >>> >>> Can someone tell me at what point this error could come? >>> >>> In one of my use cases, I am trying to use hadoop custom input format. >>> Here is my code. >>> >>> |valhConf:Configuration=sc.hadoopConfiguration >>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob >>> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD >>> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount >>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}| >>> >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the >>> below error in my spark-submit launch| >>> >>> | >>> | >>> >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage >>> 0.0(TID 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark >>> at >>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at >>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)| >>> >>> Any help here to move towards fixing this will be of great help >>> >>> >>> >>> Thanks >>> >>> Bala >>> >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >
Error : - No filesystem for scheme: spark
Can someone tell me at what point this error could come? In one of my use cases, I am trying to use hadoop custom input format. Here is my code. val hConf: Configuration = sc.hadoopConfiguration hConf.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName) hConf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)var job = new Job(hConf) FileInputFormat.setInputPaths(job,new Path("hdfs:///user/bala/MyBinaryFile")); var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable], job.getConfiguration() ) val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) => myfuncPart(split, iter)} The moment I invoke mapPartitionsWithInputSplit() method, I get the below error in my spark-submit launch 15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem for scheme: spark at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) Any help here to move towards fixing this will be of great help Thanks Bala
Using Hadoop Custom Input format in Spark
Hello, I have developed a hadoop based solution that process a binary file. This uses classic hadoop MR technique. The binary file is about 10GB and divided into 73 HDFS blocks, and the business logic written as map process operates on each of these 73 blocks. We have developed a customInputFormat and CustomRecordReader in Hadoop that returns key (intWritable) and value (BytesWritable) to the map function. The value is nothing but the contents of a HDFS block(bianry data). The business logic knows how to read this data. Now, I would like to port this code in spark. I am a starter in spark and could run simple examples (wordcount, pi example) in spark. However, could not straightforward example to process binaryFiles in spark. I see there are two solutions for this use case. In the first, avoid using custom input format and record reader. Find a method (approach) in spark the creates a RDD for those HDFS blocks, use a map like method that feeds HDFS block content to the business logic. If this is not possible, I would like to re-use the custom input format and custom reader using some methods such as HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first approach is possible or not. If possible, can anyone please provide some pointers that contains examples? I was trying second approach but highly unsuccessful. Here is the code snippet I used object Driver { def myFunc(key : IntWritable, content : BytesWritable) = { println("my business logic") // printing key and content value/size is 0 } def main(args: Array[String]) { // create a spark context val conf = new SparkConf().setAppName("Dummy").setMaster("spark://:7077") val sc = new SparkContext(conf) val rd = sc.newAPIHadoopFile("hdfs:///user/name/MyDataFile.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable]) val count = rd.map (x => func(x._1, x._2)).collect() } } Can someone tell where I am doing wrong here? I think I am not using API the right way but failed to find some documentation/usage examples. Thanks in advancea - bala