Re: Issue in spark job. Remote rpc client dissociated

2016-07-14 Thread Balachandar R.A.
Hello,

The variable argsList is an array defined above the parallel block. This
variawis accessed inside the map function. Launcher.main is not threadsafe.
Is is not possible to specify to spark that every folder needs to be
processed as a separate process in a separate working directory?

Regards
Bala
On 14-Jul-2016 2:37 pm, "Sun Rui"  wrote:

> Where is argsList defined? is Launcher.main() thread-safe? Note that if
> multiple folders are processed in a node, multiple threads may concurrently
> run in the executor, each processing a folder.
>
> On Jul 14, 2016, at 12:28, Balachandar R.A. 
> wrote:
>
> Hello Ted,
>>
>
>
> Thanks for the response. Here is the additional information.
>
>
>> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>>
>> Here is the code snippet
>>
>>
>> JavaRDD add = jsc.parallelize(listFolders, listFolders.size());
>> JavaRDD test = add.map(new Function()
>> {
>> @Override
>> public Integer call(File file) throws Exception {
>> String folder = file.getName();
>> System.out.println("[x] Processing dataset from the
>> directory " + folder);
>> int status = 0;
>>argsList[3] = argsList[3] + "/"+ folder;   // full
>> path of the input folder. Input folder is in shared file system that every
>> worker node has access to it. Something like (“/home/user/software/data/”)
>> and folder name will be like (“20161307”)
>> argsList[7] = argsList[7] + "/" + folder + ".csv"; //
>> full path of the output.
>> try{
>> Launcher.main(argsList);  // Launcher class is a
>> black box. It process the input folder and create a csv file which in the
>> output location (argsList[7]). This is also in a shared file system
>> status = 0;
>> }
>> catch(Exception e)
>> {
>> System.out.println("[x] Execution of import tool
>> for the directory " + folder + "failed");
>> status = 0;
>> }
>> accum.add(1);
>> return status;
>> }
>> });
>>
>>
>> Here is the spark-env.sh
>>
>> export SPARK_WORKER_INSTANCES=1
>> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
>> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>>
>> Here is the spark-defaults.conf
>>
>>
>>   spark.master spark:// master:7077
>>   spark.eventLog.enabled   true
>>   spark.eventLog.dir   hdfs://master:9000/sparkEvent
>>   spark.serializer
>> org.apache.spark.serializer.KryoSerializer
>>   spark.driver.memory  4g
>>
>>
>
>
> Hope it helps.
>
>
>


Re: Issue in spark job. Remote rpc client dissociated

2016-07-13 Thread Balachandar R.A.
>
> Hello Ted,
>


Thanks for the response. Here is the additional information.


> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>
>
>
> Here is the code snippet
>
>
>
>
>
> JavaRDD add = jsc.parallelize(listFolders, listFolders.size());
>
> JavaRDD test = add.map(new Function() {
>
> @Override
>
> public Integer call(File file) throws Exception {
>
> String folder = file.getName();
>
> System.out.println("[x] Processing dataset from the
> directory " + folder);
>
> int status = 0;
>
>argsList[3] = argsList[3] + "/"+ folder;   // full path
> of the input folder. Input folder is in shared file system that every
> worker node has access to it. Something like (“/home/user/software/data/”)
> and folder name will be like (“20161307”)
>
> argsList[7] = argsList[7] + "/" + folder + ".csv"; //
> full path of the output.
>
> try{
>
> Launcher.main(argsList);  // Launcher class is a
> black box. It process the input folder and create a csv file which in the
> output location (argsList[7]). This is also in a shared file system
>
> status = 0;
>
> }
>
> catch(Exception e)
>
> {
>
> System.out.println("[x] Execution of import tool
> for the directory " + folder + "failed");
>
> status = 0;
>
> }
>
> accum.add(1);
>
> return status;
>
> }
>
> });
>
>
>
>
>
> Here is the spark-env.sh
>
>
>
> export SPARK_WORKER_INSTANCES=1
>
> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
>
> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>
>
>
> Here is the spark-defaults.conf
>
>
>
>
>
>   spark.master spark:// master:7077
>
>   spark.eventLog.enabled   true
>
>   spark.eventLog.dir   hdfs://master:9000/sparkEvent
>
>   spark.serializer
> org.apache.spark.serializer.KryoSerializer
>
>   spark.driver.memory  4g
>
>
>


Hope it helps.


Issue in spark job. Remote rpc client dissociated

2016-07-13 Thread Balachandar R.A.
Hello

In one of my use cases, i need to process list of folders in parallel. I
used
Sc.parallelize (list,list.size).map(" logic to process the folder").
I have a six node cluster and there are six folders to process.  Ideally i
expect that each of my node process one folder.  But,  i see that a node
process multiple folders while one or two of the nodes do not get any job.
In the end, the spark- submit crashes with the exception saying "remote RPC
client dissociated". Can someone give me a hint on what's going wrong here?
Please note that this issue does not arise if i comment my logic that
processes the folder but simply print folder name. In this case,  every
node gets one folder to process.  I inserted a sleep of 40 seconds inside
the map. No issue. But when i uncomment my logic i see this issue. Also,
before crashing it does process some of the folders successfully.
Successfully means the business logic generates a file in a shared file
system.

Regards
Bala


Spark job state is EXITED but does not return

2016-07-11 Thread Balachandar R.A.
Hello,

I have one apache spark based simple use case that process two datasets.
Each dataset takes about 5-7 min to process. I am doing this processing
inside the sc.parallelize(datasets){ } block. While the first dataset is
processed successfully,  the processing of dataset is not started by spark.
The application state is RUNNING but in executor summary, I notice that the
state here is EXITED. Can someone tell me where things are going wrong?

Regards
Bala


Re: One map per folder in spark or Hadoop

2016-07-07 Thread Balachandar R.A.
Hi

Thanks for the code snippet. If the executable inside the map process needs
to access directories and files present in the local file system. Is it
possible? I know they are running in slave node in a temporary working
directory and i can think about distributed cache. But still would like to
know if the map process can access local file system

Regards
Bala
On 01-Jul-2016 7:46 am, "Sun Rui"  wrote:

> Say you have got all of your folder paths into a val folders: Seq[String]
>
> val add = sc.parallelize(folders, folders.size).mapPartitions { iter =>
>   val folder = iter.next
>   val status: Int = 
>   Seq(status).toIterator
> }
>
> On Jun 30, 2016, at 16:42, Balachandar R.A. 
> wrote:
>
> Hello,
>
> I have some 100 folders. Each folder contains 5 files. I have an
> executable that process one folder. The executable is a black box and hence
> it cannot be modified.I would like to process 100 folders in parallel using
> Apache spark so that I should be able to span a map task per folder. Can
> anyone give me an idea? I have came across similar questions but with
> Hadoop and answer was to use combineFileInputFormat and pathFilter.
> However, as I said, I want to use Apache spark. Any idea?
>
> Regards
> Bala
>
>
>


Re: One map per folder in spark or Hadoop

2016-06-30 Thread Balachandar R.A.
Thank you very much.  I will try this code and update you

Regards
Bala
On 01-Jul-2016 7:46 am, "Sun Rui"  wrote:

> Say you have got all of your folder paths into a val folders: Seq[String]
>
> val add = sc.parallelize(folders, folders.size).mapPartitions { iter =>
>   val folder = iter.next
>   val status: Int = 
>   Seq(status).toIterator
> }
>
> On Jun 30, 2016, at 16:42, Balachandar R.A. 
> wrote:
>
> Hello,
>
> I have some 100 folders. Each folder contains 5 files. I have an
> executable that process one folder. The executable is a black box and hence
> it cannot be modified.I would like to process 100 folders in parallel using
> Apache spark so that I should be able to span a map task per folder. Can
> anyone give me an idea? I have came across similar questions but with
> Hadoop and answer was to use combineFileInputFormat and pathFilter.
> However, as I said, I want to use Apache spark. Any idea?
>
> Regards
> Bala
>
>
>


One map per folder in spark or Hadoop

2016-06-30 Thread Balachandar R.A.
Hello,

I have some 100 folders. Each folder contains 5 files. I have an executable
that process one folder. The executable is a black box and hence it cannot
be modified.I would like to process 100 folders in parallel using Apache
spark so that I should be able to span a map task per folder. Can anyone
give me an idea? I have came across similar questions but with Hadoop and
answer was to use combineFileInputFormat and pathFilter. However, as I
said, I want to use Apache spark. Any idea?

Regards
Bala


How to calculate weighted degrees in GraphX

2016-02-01 Thread Balachandar R.A.
I am new to GraphX and exploring example flight data analysis found on
online.
http://www.sparktutorials.net/analyzing-flight-data:-a-gentle-introduction-to-graphx-in-spark

I tried calculating inDegrees (understand how many incoming flights to an
airport) but I see value which corresponds to unique routes incoming to the
airport. This means that if there exist more than one flight between a
particular source and destination, lets' say 5, I am seeing only 1 for this
route. What I want is something similar in the line of weighted degrees
where in while calculating indegrees, the weight associated with the edge
should be considered. This weight value is in edge._attr parameter which is
not being considered for the computing degrees. Here is the my code block

val inDegrees = graph.inDegrees.join(airportVertices).sortBy(_._2._1,
ascending=false).take(10)
val outDegrees =
graph.outDegrees.join(airportVertices).sortBy(_._2._1,
ascending=false).take(10)
// Top 10 Indegrees
println("Top 10 indegrees - > " )
inDegrees.foreach{println}

// Top 10 out degrees
println("Top 10 outDegrees -> " )
outDegrees.foreach{println}

Can someone point to me right link or provide me an hint to solve this?

regards

Bala


Re: GraphX can show graph?

2016-01-29 Thread Balachandar R.A.
Thanks... Will look into that

- Bala

On 28 January 2016 at 15:36, Sahil Sareen  wrote:

> Try Neo4j for visualization, GraphX does a pretty god job at distributed
> graph processing.
>
> On Thu, Jan 28, 2016 at 12:42 PM, Balachandar R.A. <
> balachandar...@gmail.com> wrote:
>
>> Hi
>>
>> I am new to GraphX. I have a simple csv file which I could load and
>> compute few graph statistics. However, I am not sure whether it is possible
>> to create ad show graph (for visualization purpose) using GraphX. Any
>> pointer to tutorial or information connected to this will be really helpful
>>
>> Thanks and regards
>> Bala
>>
>
>


GraphX can show graph?

2016-01-27 Thread Balachandar R.A.
Hi

I am new to GraphX. I have a simple csv file which I could load and compute
few graph statistics. However, I am not sure whether it is possible to
create ad show graph (for visualization purpose) using GraphX. Any pointer
to tutorial or information connected to this will be really helpful

Thanks and regards
Bala


spark-shell issue Job in illegal state & sparkcontext not serializable

2015-11-20 Thread Balachandar R.A.
Hello users,

In one of my usecases, I need to launch a spark job from spark-shell. My
input file is in HDFS and I am using NewHadoopRDD to construct RDD out of
this input file as it uses custom input format.

  val hConf = sc.hadoopConfiguration



var job = new Job(hConf)



FileInputFormat.setInputPaths(job,new Path(path));


 var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat],

classOf[IntWritable],

classOf[BytesWritable],

job.getConfiguration()

)


However, when I run this commands, epsecially the 2nd line, in spark-shell,
I get the below exception

java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING


So, I put these lines inside a method in another class. In my driver code,
I am calling the method.


 // obtain RDD for the input file in HDFS

  var hRDD = cas.getRDD("/user/bala/RDF_STORE/MERGED_DAR.DAT_256")



The above line construct Hadoop RDD and return the handle to the driver
code. This works fine now. This means that I could work around
illegalstateException issue. However, when I run the below command


  // runs the map and collect the results from distributed machines

  val result = hRDD.mapPartitionsWithInputSplit{ (split, iter) =>
cas.extractCAS(split, iter)}.collect()




I get an error "Caused by: java.io.NotSerializableException:
org.apache.spark.SparkContext"


Can someone help how can I work around this problem?? This code works
perfectly when I use spark-submit though but I cannot use this. My ultimate
idea is to run the driver code from zeppelin and hence i am testing this
code in spark-shell



with regards

Bala


Re: converting categorical values in csv file to numerical values

2015-11-05 Thread Balachandar R.A.
Hi Guillaume,


This is always an option. However, I read about HashingTF which exactly
does this quite efficiently and can scale too. Hence, looking for a
solution using this technique.


regards
Bala


On 5 November 2015 at 18:50, tog  wrote:

> Hi Bala
>
> Can't you do a simple dictionnary and map those values to numbers?
>
> Cheers
> Guillaume
>
> On 5 November 2015 at 09:54, Balachandar R.A. 
> wrote:
>
>> HI
>>
>>
>> I am new to spark MLlib and machine learning. I have a csv file that
>> consists of around 100 thousand rows and 20 columns. Of these 20 columns,
>> 10 contains string values. Each value in these columns are not necessarily
>> unique. They are kind of categorical, that is, the values could be one
>> amount, say 10 values. To start with, I could run examples, especially,
>> random forest algorithm in my local spark (1.5.1.) platform. However, I
>> have a challenge with my dataset due to these strings as the APIs takes
>> numerical values. Can any one tell me how I can map these categorical
>> values (strings) into numbers and use them with random forest algorithms?
>> Any example will be greatly appreciated.
>>
>>
>> regards
>>
>> Bala
>>
>
>
>
> --
> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>


converting categorical values in csv file to numerical values

2015-11-05 Thread Balachandar R.A.
HI


I am new to spark MLlib and machine learning. I have a csv file that
consists of around 100 thousand rows and 20 columns. Of these 20 columns,
10 contains string values. Each value in these columns are not necessarily
unique. They are kind of categorical, that is, the values could be one
amount, say 10 values. To start with, I could run examples, especially,
random forest algorithm in my local spark (1.5.1.) platform. However, I
have a challenge with my dataset due to these strings as the APIs takes
numerical values. Can any one tell me how I can map these categorical
values (strings) into numbers and use them with random forest algorithms?
Any example will be greatly appreciated.


regards

Bala


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
I made a stupid mistake it seems. I supplied the --master option to the
spark url in my launch command. And this error is gone.

Thanks for pointing out possible places for troubleshooting

Regards
Bala
On 02-Nov-2015 3:15 pm, "Balachandar R.A."  wrote:

> No.. I am not using yarn. Yarn is not running in my cluster. So, it is
> standalone one.
>
> Regards
> Bala
> On 02-Nov-2015 3:11 pm, "Jean-Baptiste Onofré"  wrote:
>
>> Just to be sure: you use yarn cluster (not standalone), right ?
>>
>> Regards
>> JB
>>
>> On 11/02/2015 10:37 AM, Balachandar R.A. wrote:
>>
>>> Yes. In two different places I use spark://
>>>
>>> 1. In my code, while creating spark configuration, I use the code below
>>>
>>> val sConf = new
>>> SparkConf().setAppName("Dummy").setMaster("spark://:7077")
>>> val sConf = val sc = new SparkContext(sConf)
>>>
>>>
>>> 2. I run the job using the command below
>>>
>>> spark-submit  --class org.myjob  --jars myjob.jar spark://:7077
>>> myjob.jar
>>>
>>> regards
>>> Bala
>>>
>>>
>>> On 2 November 2015 at 14:59, Romi Kuntsman >> <mailto:r...@totango.com>> wrote:
>>>
>>> except "spark.master", do you have "spark://" anywhere in your code
>>> or config files?
>>>
>>> *Romi Kuntsman*, /Big Data Engineer/_
>>> _
>>> http://www.totango.com <http://www.totango.com/>
>>>
>>> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A.
>>> mailto:balachandar...@gmail.com>> wrote:
>>>
>>>
>>> -- Forwarded message --
>>> From: "Balachandar R.A." >> <mailto:balachandar...@gmail.com>>
>>> Date: 02-Nov-2015 12:53 pm
>>> Subject: Re: Error : - No filesystem for scheme: spark
>>> To: "Jean-Baptiste Onofré" >> <mailto:j...@nanthrax.net>>
>>> Cc:
>>>
>>>  > HI JB,
>>>  > Thanks for the response,
>>>  > Here is the content of my spark-defaults.conf
>>>  >
>>>  >
>>>  > # Default system properties included when running
>>> spark-submit.
>>>  > # This is useful for setting default environmental settings.
>>>  >
>>>  > # Example:
>>>  >  spark.master spark://fdoat:7077
>>>  > # spark.eventLog.enabled   true
>>>  >  spark.eventLog.dir/home/bala/spark-logs
>>>  > # spark.eventLog.dir
>>>  hdfs://namenode:8021/directory
>>>  > # spark.serializer
>>> org.apache.spark.serializer.KryoSerializer
>>>  > # spark.driver.memory  5g
>>>  > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails
>>> -Dkey=value -Dnumbers="one two three"
>>>  >
>>>  >
>>>  > regards
>>>  > Bala
>>>
>>>
>>>  >
>>>  > On 2 November 2015 at 12:21, Jean-Baptiste Onofré
>>> mailto:j...@nanthrax.net>> wrote:
>>>  >>
>>>  >> Hi,
>>>  >>
>>>  >> do you have something special in conf/spark-defaults.conf
>>> (especially on the eventLog directory) ?
>>>  >>
>>>  >> Regards
>>>  >> JB
>>>  >>
>>>  >>
>>>  >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>>>  >>>
>>>  >>> Can someone tell me at what point this error could come?
>>>  >>>
>>>  >>> In one of my use cases, I am trying to use hadoop custom
>>> input format.
>>>  >>> Here is my code.
>>>  >>>
>>>  >>> |valhConf:Configuration=sc.hadoopConfiguration
>>>  >>>
>>>
>>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].ge

Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
No.. I am not using yarn. Yarn is not running in my cluster. So, it is
standalone one.

Regards
Bala
On 02-Nov-2015 3:11 pm, "Jean-Baptiste Onofré"  wrote:

> Just to be sure: you use yarn cluster (not standalone), right ?
>
> Regards
> JB
>
> On 11/02/2015 10:37 AM, Balachandar R.A. wrote:
>
>> Yes. In two different places I use spark://
>>
>> 1. In my code, while creating spark configuration, I use the code below
>>
>> val sConf = new
>> SparkConf().setAppName("Dummy").setMaster("spark://:7077")
>> val sConf = val sc = new SparkContext(sConf)
>>
>>
>> 2. I run the job using the command below
>>
>> spark-submit  --class org.myjob  --jars myjob.jar spark://:7077
>> myjob.jar
>>
>> regards
>> Bala
>>
>>
>> On 2 November 2015 at 14:59, Romi Kuntsman > <mailto:r...@totango.com>> wrote:
>>
>> except "spark.master", do you have "spark://" anywhere in your code
>> or config files?
>>
>> *Romi Kuntsman*, /Big Data Engineer/_
>> _
>> http://www.totango.com <http://www.totango.com/>
>>
>> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A.
>> mailto:balachandar...@gmail.com>> wrote:
>>
>>
>> -- Forwarded message --
>> From: "Balachandar R.A." > <mailto:balachandar...@gmail.com>>
>> Date: 02-Nov-2015 12:53 pm
>> Subject: Re: Error : - No filesystem for scheme: spark
>> To: "Jean-Baptiste Onofré" > <mailto:j...@nanthrax.net>>
>> Cc:
>>
>>  > HI JB,
>>  > Thanks for the response,
>>  > Here is the content of my spark-defaults.conf
>>  >
>>  >
>>  > # Default system properties included when running spark-submit.
>>  > # This is useful for setting default environmental settings.
>>  >
>>  > # Example:
>>  >  spark.master spark://fdoat:7077
>>  > # spark.eventLog.enabled   true
>>  >  spark.eventLog.dir/home/bala/spark-logs
>>  > # spark.eventLog.dir
>>  hdfs://namenode:8021/directory
>>  > # spark.serializer
>> org.apache.spark.serializer.KryoSerializer
>>  > # spark.driver.memory  5g
>>  > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails
>> -Dkey=value -Dnumbers="one two three"
>>  >
>>  >
>>  > regards
>>  > Bala
>>
>>
>>  >
>>  > On 2 November 2015 at 12:21, Jean-Baptiste Onofré
>> mailto:j...@nanthrax.net>> wrote:
>>  >>
>>  >> Hi,
>>  >>
>>  >> do you have something special in conf/spark-defaults.conf
>> (especially on the eventLog directory) ?
>>  >>
>>  >> Regards
>>  >> JB
>>  >>
>>  >>
>>  >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>>  >>>
>>  >>> Can someone tell me at what point this error could come?
>>  >>>
>>  >>> In one of my use cases, I am trying to use hadoop custom
>> input format.
>>  >>> Here is my code.
>>  >>>
>>  >>> |valhConf:Configuration=sc.hadoopConfiguration
>>  >>>
>>
>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
>>  >>>
>>
>> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
>>  >>>
>>
>> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
>>  >>>
>>
>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
>>  >>>
>>  >>> |The moment I invoke mapPartitionsWithInputSplit() method,
>> I get the
>>  >

Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
Yes. In two different places I use spark://

1. In my code, while creating spark configuration, I use the code below

val sConf = new
SparkConf().setAppName("Dummy").setMaster("spark://:7077")
val sConf = val sc = new SparkContext(sConf)


2. I run the job using the command below

spark-submit  --class org.myjob  --jars myjob.jar spark://:7077
myjob.jar

regards
Bala


On 2 November 2015 at 14:59, Romi Kuntsman  wrote:

> except "spark.master", do you have "spark://" anywhere in your code or
> config files?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. <
> balachandar...@gmail.com> wrote:
>
>>
>> -- Forwarded message --
>> From: "Balachandar R.A." 
>> Date: 02-Nov-2015 12:53 pm
>> Subject: Re: Error : - No filesystem for scheme: spark
>> To: "Jean-Baptiste Onofré" 
>> Cc:
>>
>> > HI JB,
>> > Thanks for the response,
>> > Here is the content of my spark-defaults.conf
>> >
>> >
>> > # Default system properties included when running spark-submit.
>> > # This is useful for setting default environmental settings.
>> >
>> > # Example:
>> >  spark.master spark://fdoat:7077
>> > # spark.eventLog.enabled   true
>> >  spark.eventLog.dir/home/bala/spark-logs
>> > # spark.eventLog.dir   hdfs://namenode:8021/directory
>> > # spark.serializer
>> org.apache.spark.serializer.KryoSerializer
>> > # spark.driver.memory  5g
>> > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
>> -Dnumbers="one two three"
>> >
>> >
>> > regards
>> > Bala
>>
>> >
>> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> do you have something special in conf/spark-defaults.conf (especially
>> on the eventLog directory) ?
>> >>
>> >> Regards
>> >> JB
>> >>
>> >>
>> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>> >>>
>> >>> Can someone tell me at what point this error could come?
>> >>>
>> >>> In one of my use cases, I am trying to use hadoop custom input format.
>> >>> Here is my code.
>> >>>
>> >>> |valhConf:Configuration=sc.hadoopConfiguration
>> >>>
>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
>> >>>
>> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
>> >>>
>> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
>> >>>
>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
>> >>>
>> >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the
>> >>> below error in my spark-submit launch|
>> >>>
>> >>> |
>> >>> |
>> >>>
>> >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
>> >>> 0.0(TID
>> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
>> >>> at
>> >>>
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
>> >>>
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
>> >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
>> >>>
>> >>> Any help here to move towards fixing this will be of great help
>> >>>
>> >>>
>> >>>
>> >>> Thanks
>> >>>
>> >>> Bala
>> >>>
>> >>
>> >> --
>> >> Jean-Baptiste Onofré
>> >> jbono...@apache.org
>> >> http://blog.nanthrax.net
>> >> Talend - http://www.talend.com
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>>
>>
>


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
-- Forwarded message --
From: "Balachandar R.A." 
Date: 02-Nov-2015 12:53 pm
Subject: Re: Error : - No filesystem for scheme: spark
To: "Jean-Baptiste Onofré" 
Cc:

> HI JB,
> Thanks for the response,
> Here is the content of my spark-defaults.conf
>
>
> # Default system properties included when running spark-submit.
> # This is useful for setting default environmental settings.
>
> # Example:
>  spark.master spark://fdoat:7077
> # spark.eventLog.enabled   true
>  spark.eventLog.dir/home/bala/spark-logs
> # spark.eventLog.dir   hdfs://namenode:8021/directory
> # spark.serializer
org.apache.spark.serializer.KryoSerializer
> # spark.driver.memory  5g
> # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
-Dnumbers="one two three"
>
>
> regards
> Bala
>
> On 2 November 2015 at 12:21, Jean-Baptiste Onofré  wrote:
>>
>> Hi,
>>
>> do you have something special in conf/spark-defaults.conf (especially on
the eventLog directory) ?
>>
>> Regards
>> JB
>>
>>
>> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>>>
>>> Can someone tell me at what point this error could come?
>>>
>>> In one of my use cases, I am trying to use hadoop custom input format.
>>> Here is my code.
>>>
>>> |valhConf:Configuration=sc.hadoopConfiguration
>>>
hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
>>>
=newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
>>>
=newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
>>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
>>>
>>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the
>>> below error in my spark-submit launch|
>>>
>>> |
>>> |
>>>
>>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
>>> 0.0(TID 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
>>> at
>>>
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
>>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
>>>
>>> Any help here to move towards fixing this will be of great help
>>>
>>>
>>>
>>> Thanks
>>>
>>> Bala
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>


Error : - No filesystem for scheme: spark

2015-11-01 Thread Balachandar R.A.
Can someone tell me at what point this error could come?

In one of my use cases, I am trying to use hadoop custom input format. Here
is my code.

val hConf: Configuration = sc.hadoopConfiguration
hConf.set("fs.hdfs.impl",
classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hConf.set("fs.file.impl",
classOf[org.apache.hadoop.fs.LocalFileSystem].getName)var job = new
Job(hConf)
FileInputFormat.setInputPaths(job,new
Path("hdfs:///user/bala/MyBinaryFile"));


var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat],
classOf[IntWritable],
classOf[BytesWritable],
job.getConfiguration()
)

val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) =>
myfuncPart(split, iter)}

The moment I invoke mapPartitionsWithInputSplit() method, I get the
below error in my spark-submit launch


15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in
stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem
for scheme: spark
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

Any help here to move towards fixing this will be of great help



Thanks

Bala


Using Hadoop Custom Input format in Spark

2015-10-27 Thread Balachandar R.A.
Hello,


I have developed a hadoop based solution that process a binary file. This
uses classic hadoop MR technique. The binary file is about 10GB and divided
into 73 HDFS blocks, and the business logic written as map process operates
on each of these 73 blocks. We have developed a customInputFormat and
CustomRecordReader in Hadoop that returns key (intWritable) and value
(BytesWritable) to the map function. The value is nothing but the contents
of a HDFS block(bianry data). The business logic knows how to read this
data.

Now, I would like to port this code in spark. I am a starter in spark and
could run simple examples (wordcount, pi example) in spark. However, could
not straightforward example to process binaryFiles in spark. I see there
are two solutions for this use case. In the first, avoid using custom input
format and record reader. Find a method (approach) in spark the creates a
RDD for those HDFS blocks, use a map like method that feeds HDFS block
content to the business logic. If this is not possible, I would like to
re-use the custom input format and custom reader using some methods such as
HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first
approach is possible or not. If possible, can anyone please provide some
pointers that contains examples? I was trying second approach but highly
unsuccessful. Here is the code snippet I used

object Driver {
def myFunc(key : IntWritable, content : BytesWritable) = {
   println("my business logic")
  // printing key and content value/size is 0
   }


def main(args: Array[String]) {
  // create a spark context
  val conf = new
SparkConf().setAppName("Dummy").setMaster("spark://:7077")
  val sc = new SparkContext(conf)
  val rd = sc.newAPIHadoopFile("hdfs:///user/name/MyDataFile.dat",
classOf[RandomAccessInputFormat], classOf[IntWritable],
classOf[BytesWritable])
  val count = rd.map (x => func(x._1, x._2)).collect()
   }
}

Can someone tell where I am doing wrong here? I think I am not using API
the right way but failed to find some documentation/usage examples.


Thanks in advancea

- bala