Re: Submitting Spark Applications using Spark Submit

2015-06-20 Thread Raghav Shankar
)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Also, in the above error it says: connection refused to 
ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 
http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077 I don’t understand 
where it gets the 10.165.103.16 
http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077 from. I never 
specify that in the master url command line parameter. Any ideas on what I 
might be doing wrong?

 On Jun 19, 2015, at 7:19 PM, Andrew Or and...@databricks.com wrote:
 
 Hi Raghav,
 
 I'm assuming you're using standalone mode. When using the Spark EC2 scripts 
 you need to make sure that every machine has the most updated jars. Once you 
 have built on one of the nodes, you must rsync the Spark directory to the 
 rest of the nodes (see /root/spark-ec2/copy-dir).
 
 That said, I usually build it locally on my laptop and scp the assembly jar 
 to the cluster instead of building it there. The EC2 machines often take much 
 longer to build for some reason. Also it's cumbersome to set up proper IDE 
 there.
 
 -Andrew
 
 
 2015-06-19 19:11 GMT-07:00 Raghav Shankar raghav0110...@gmail.com:
 Thanks Andrew! Is this all I have to do when using the spark ec2 script to 
 setup a spark cluster? It seems to be getting an assembly jar that is not 
 from my project(perhaps from a maven repo). Is there a way to make the ec2 
 script use the assembly jar that I created?
 
 Thanks,
 Raghav 
 
 
 On Friday, June 19, 2015, Andrew Or and...@databricks.com wrote:
 Hi Raghav,
 
 If you want to make changes to Spark and run your application with it, you 
 may follow these steps.
 
 1. git clone g...@github.com:apache/spark
 2. cd spark; build/mvn clean package -DskipTests [...]
 3. make local changes
 4. build/mvn package -DskipTests [...] (no need to clean again here)
 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar
 
 No need to pass in extra --driver-java-options or --driver-extra-classpath as 
 others have suggested. When using spark-submit, the main jar comes from 
 assembly/target/scala_2.10, which is prepared through mvn package. You just 
 have to make sure that you re-package the assembly jar after each 
 modification.
 
 -Andrew
 
 2015-06-18 16:35 GMT-07:00 maxdml max...@cs.duke.edu:
 You can specify the jars of your application to be included with spark-submit
 with the /--jars/ switch.
 
 Otherwise, are you sure that your newly compiled spark jar assembly is in
 assembly/target/scala-2.10/?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 



Re: Submitting Spark Applications using Spark Submit

2015-06-19 Thread Raghav Shankar
Thanks Andrew! Is this all I have to do when using the spark ec2 script to
setup a spark cluster? It seems to be getting an assembly jar that is not
from my project(perhaps from a maven repo). Is there a way to make the ec2
script use the assembly jar that I created?

Thanks,
Raghav

On Friday, June 19, 2015, Andrew Or and...@databricks.com wrote:

 Hi Raghav,

 If you want to make changes to Spark and run your application with it, you
 may follow these steps.

 1. git clone g...@github.com:apache/spark
 2. cd spark; build/mvn clean package -DskipTests [...]
 3. make local changes
 4. build/mvn package -DskipTests [...] (no need to clean again here)
 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar

 No need to pass in extra --driver-java-options or --driver-extra-classpath
 as others have suggested. When using spark-submit, the main jar comes from
 assembly/target/scala_2.10, which is prepared through mvn package. You
 just have to make sure that you re-package the assembly jar after each
 modification.

 -Andrew

 2015-06-18 16:35 GMT-07:00 maxdml max...@cs.duke.edu
 javascript:_e(%7B%7D,'cvml','max...@cs.duke.edu');:

 You can specify the jars of your application to be included with
 spark-submit
 with the /--jars/ switch.

 Otherwise, are you sure that your newly compiled spark jar assembly is in
 assembly/target/scala-2.10/?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org');
 For additional commands, e-mail: user-h...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');





Re: Implementing top() using treeReduce()

2015-06-17 Thread Raghav Shankar
I’ve implemented this in the suggested manner. When I build Spark and attach 
the new spark-core jar to my eclipse project, I am able to use the new method. 
In order to conduct the experiments I need to launch my app on a cluster. I am 
using EC2. When I setup my master and slaves using the EC2 setup scripts, it 
sets up spark, but I think my custom built spark-core jar is not being used. 
How do it up on EC2 so that my custom version of Spark-core is used?

Thanks,
Raghav

 On Jun 9, 2015, at 7:41 PM, DB Tsai dbt...@dbtsai.com wrote:
 
 Having the following code in RDD.scala works for me. PS, in the following 
 code, I merge the smaller queue into larger one. I wonder if this will help 
 performance. Let me know when you do the benchmark.
 def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = 
 withScope {
   if (num == 0) {
 Array.empty
   } else {
 val mapRDDs = mapPartitions { items =
   // Priority keeps the largest elements, so let's reverse the ordering.
   val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
   queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
   Iterator.single(queue)
 }
 if (mapRDDs.partitions.length == 0) {
   Array.empty
 } else {
   mapRDDs.treeReduce { (queue1, queue2) =
 if (queue1.size  queue2.size) {
   queue1 ++= queue2
   queue1
 } else {
   queue2 ++= queue1
   queue2
 }
   }.toArray.sorted(ord)
 }
   }
 }
 
 def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
   treeTakeOrdered(num)(ord.reverse)
 }
 
 
 Sincerely,
 
 DB Tsai
 --
 Blog: https://www.dbtsai.com https://www.dbtsai.com/
 PGP Key ID: 0xAF08DF8D 
 https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D
 
 On Tue, Jun 9, 2015 at 10:09 AM, raggy raghav0110...@gmail.com 
 mailto:raghav0110...@gmail.com wrote:
 I am trying to implement top-k in scala within apache spark. I am aware that
 spark has a top action. But, top() uses reduce(). Instead, I would like to
 use treeReduce(). I am trying to compare the performance of reduce() and
 treeReduce().
 
 The main issue I have is that I cannot use these 2 lines of code which are
 used in the top() action within my Spark application.
 
 val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
 queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
 
 How can I go about implementing top() using treeReduce()?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Re: Implementing top() using treeReduce()

2015-06-17 Thread Raghav Shankar
So, I would add the assembly jar to the just the master or would I have to add 
it to all the slaves/workers too? 

Thanks,
Raghav

 On Jun 17, 2015, at 5:13 PM, DB Tsai dbt...@dbtsai.com wrote:
 
 You need to build the spark assembly with your modification and deploy
 into cluster.
 
 Sincerely,
 
 DB Tsai
 --
 Blog: https://www.dbtsai.com
 PGP Key ID: 0xAF08DF8D
 
 
 On Wed, Jun 17, 2015 at 5:11 PM, Raghav Shankar raghav0110...@gmail.com 
 wrote:
 I’ve implemented this in the suggested manner. When I build Spark and attach
 the new spark-core jar to my eclipse project, I am able to use the new
 method. In order to conduct the experiments I need to launch my app on a
 cluster. I am using EC2. When I setup my master and slaves using the EC2
 setup scripts, it sets up spark, but I think my custom built spark-core jar
 is not being used. How do it up on EC2 so that my custom version of
 Spark-core is used?
 
 Thanks,
 Raghav
 
 On Jun 9, 2015, at 7:41 PM, DB Tsai dbt...@dbtsai.com wrote:
 
 Having the following code in RDD.scala works for me. PS, in the following
 code, I merge the smaller queue into larger one. I wonder if this will help
 performance. Let me know when you do the benchmark.
 
 def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] =
 withScope {
  if (num == 0) {
Array.empty
  } else {
val mapRDDs = mapPartitions { items =
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
  Array.empty
} else {
  mapRDDs.treeReduce { (queue1, queue2) =
if (queue1.size  queue2.size) {
  queue1 ++= queue2
  queue1
} else {
  queue2 ++= queue1
  queue2
}
  }.toArray.sorted(ord)
}
  }
 }
 
 def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  treeTakeOrdered(num)(ord.reverse)
 }
 
 
 
 Sincerely,
 
 DB Tsai
 --
 Blog: https://www.dbtsai.com
 PGP Key ID: 0xAF08DF8D
 
 On Tue, Jun 9, 2015 at 10:09 AM, raggy raghav0110...@gmail.com wrote:
 
 I am trying to implement top-k in scala within apache spark. I am aware
 that
 spark has a top action. But, top() uses reduce(). Instead, I would like to
 use treeReduce(). I am trying to compare the performance of reduce() and
 treeReduce().
 
 The main issue I have is that I cannot use these 2 lines of code which are
 used in the top() action within my Spark application.
 
 val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
 queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
 
 How can I go about implementing top() using treeReduce()?
 
 
 
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Raghav Shankar
To clarify, I am using the spark standalone cluster.

On Tuesday, June 16, 2015, Yanbo Liang yblia...@gmail.com wrote:

 If you run Spark on YARN, the simplest way is replace the
 $SPARK_HOME/lib/spark-.jar with your own version spark jar file and run
 your application.
 The spark-submit script will upload this jar to YARN cluster automatically
 and then you can run your application as usual.
 It does not care about which version of Spark in your YARN cluster.

 2015-06-17 10:42 GMT+08:00 Raghav Shankar raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com');:

 The documentation says spark.driver.userClassPathFirst can only be used
 in cluster mode. Does this mean I have to set the --deploy-mode option
 for spark-submit to cluster? Or can I still use the default client? My
 understanding is that even in the default deploy mode, spark still uses
 the slave machines I have on ec2.

 Also, the spark.driver.extraLibraryPath property mentions that I can
 provide a path for special libraries on the spark-submit command line
 options. Do my jar files in this path have to be the same name as the jar
 used by spark, or is it intelligent enough to identify that two jars are
 supposed to be the same thing? If they are supposed to be the same name,
 how can I find out the name I should use for my jar? Eg: If I just name my
 modified spark-core jar as spark.jar and put in a lib folder and provide
 the path of the folder to spark-submit would that be enough to tell Spark
 to use that spark-core jar instead of the default?

 Thanks,
 Raghav

 On Jun 16, 2015, at 7:19 PM, Will Briggs wrbri...@gmail.com
 javascript:_e(%7B%7D,'cvml','wrbri...@gmail.com'); wrote:

 If this is research-only, and you don't want to have to worry about
 updating the jars installed by default on the cluster, you can add your
 custom Spark jar using the spark.driver.extraLibraryPath configuration
 property when running spark-submit, and then use the experimental 
 spark.driver.userClassPathFirst config to force it to use yours.

 See here for more details and options:
 https://spark.apache.org/docs/1.4.0/configuration.html

 On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote:

 I made the change so that I could implement top() using treeReduce(). A
 member on here suggested I make the change in RDD.scala to accomplish that.
 Also, this is for a research project, and not for commercial use.

 So, any advice on how I can get the spark submit to use my custom built
 jars would be very useful.

 Thanks,
 Raghav

 On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com
 javascript:_e(%7B%7D,'cvml','wrbri...@gmail.com'); wrote:

 In general, you should avoid making direct changes to the Spark source
 code. If you are using Scala, you can seamlessly blend your own methods on
 top of the base RDDs using implicit conversions.

 Regards,
 Will

 On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote:

 I am trying to submit a spark application using the command line. I used
 the
 spark submit command for doing so. I initially setup my Spark application
 on
 Eclipse and have been making changes on there. I recently obtained my own
 version of the Spark source code and added a new method to RDD.scala. I
 created a new spark core jar using mvn, and I added it to my eclipse build
 path. My application ran perfectly fine.

 Now, I would like to submit it through the command line. I submitted my
 application like this:

 bin/spark-submit --master local[2] --class SimpleApp
 /Users/XXX/Desktop/spark2.jar

 The spark-submit command is within the spark project that I modified by
 adding new methods.
 When I do so, I get this error:

 java.lang.NoSuchMethodError:
 org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
 at SimpleApp$.main(SimpleApp.scala:12)
 at SimpleApp.main(SimpleApp.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 When I use spark submit, where does the jar come from? How do I make sure
 it
 uses the jars that have built?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
 Sent from the Apache Spark User List

Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Raghav Shankar
I made the change so that I could implement top() using treeReduce(). A member 
on here suggested I make the change in RDD.scala to accomplish that. Also, this 
is for a research project, and not for commercial use. 

So, any advice on how I can get the spark submit to use my custom built jars 
would be very useful.

Thanks,
Raghav

 On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote:
 
 In general, you should avoid making direct changes to the Spark source code. 
 If you are using Scala, you can seamlessly blend your own methods on top of 
 the base RDDs using implicit conversions.
 
 Regards,
 Will
 
 On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote:
 
 I am trying to submit a spark application using the command line. I used the
 spark submit command for doing so. I initially setup my Spark application on
 Eclipse and have been making changes on there. I recently obtained my own
 version of the Spark source code and added a new method to RDD.scala. I
 created a new spark core jar using mvn, and I added it to my eclipse build
 path. My application ran perfectly fine. 
 
 Now, I would like to submit it through the command line. I submitted my
 application like this:
 
 bin/spark-submit --master local[2] --class SimpleApp
 /Users/XXX/Desktop/spark2.jar
 
 The spark-submit command is within the spark project that I modified by
 adding new methods.
 When I do so, I get this error:
 
 java.lang.NoSuchMethodError:
 org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
   at SimpleApp$.main(SimpleApp.scala:12)
   at SimpleApp.main(SimpleApp.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
   at 
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
 When I use spark submit, where does the jar come from? How do I make sure it
 uses the jars that have built? 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Raghav Shankar
The documentation says spark.driver.userClassPathFirst can only be used in 
cluster mode. Does this mean I have to set the --deploy-mode option for 
spark-submit to cluster? Or can I still use the default client? My 
understanding is that even in the default deploy mode, spark still uses the 
slave machines I have on ec2. 

Also, the spark.driver.extraLibraryPath property mentions that I can provide a 
path for special libraries on the spark-submit command line options. Do my jar 
files in this path have to be the same name as the jar used by spark, or is it 
intelligent enough to identify that two jars are supposed to be the same thing? 
If they are supposed to be the same name, how can I find out the name I should 
use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and 
put in a lib folder and provide the path of the folder to spark-submit would 
that be enough to tell Spark to use that spark-core jar instead of the default?

Thanks,
Raghav

 On Jun 16, 2015, at 7:19 PM, Will Briggs wrbri...@gmail.com wrote:
 
 If this is research-only, and you don't want to have to worry about updating 
 the jars installed by default on the cluster, you can add your custom Spark 
 jar using the spark.driver.extraLibraryPath configuration property when 
 running spark-submit, and then use the experimental  
 spark.driver.userClassPathFirst config to force it to use yours.
 
 See here for more details and options: 
 https://spark.apache.org/docs/1.4.0/configuration.html
 
 On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com wrote:
 
 I made the change so that I could implement top() using treeReduce(). A 
 member on here suggested I make the change in RDD.scala to accomplish that. 
 Also, this is for a research project, and not for commercial use. 
 
 So, any advice on how I can get the spark submit to use my custom built jars 
 would be very useful.
 
 Thanks,
 Raghav
 
 On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote:
 
 In general, you should avoid making direct changes to the Spark source code. 
 If you are using Scala, you can seamlessly blend your own methods on top of 
 the base RDDs using implicit conversions.
 
 Regards,
 Will
 
 On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote:
 
 I am trying to submit a spark application using the command line. I used the
 spark submit command for doing so. I initially setup my Spark application on
 Eclipse and have been making changes on there. I recently obtained my own
 version of the Spark source code and added a new method to RDD.scala. I
 created a new spark core jar using mvn, and I added it to my eclipse build
 path. My application ran perfectly fine. 
 
 Now, I would like to submit it through the command line. I submitted my
 application like this:
 
 bin/spark-submit --master local[2] --class SimpleApp
 /Users/XXX/Desktop/spark2.jar
 
 The spark-submit command is within the spark project that I modified by
 adding new methods.
 When I do so, I get this error:
 
 java.lang.NoSuchMethodError:
 org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
  at SimpleApp$.main(SimpleApp.scala:12)
  at SimpleApp.main(SimpleApp.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  at java.lang.reflect.Method.invoke(Method.java:597)
  at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
  at 
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
 When I use spark submit, where does the jar come from? How do I make sure it
 uses the jars that have built? 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Raghav Shankar
Thank you for you responses!

You mention that it only works as long as the data fits on a single
machine. What I am tying to do is receive the sorted contents of my
dataset. For this to be possible, the entire dataset should be able to fit
on a single machine. Are you saying that sorting the entire data and
collecting it on the driver node is not a typical use case? If I want to do
this using sortBy(), I would first call sortBy() followed by a collect().
Collect() would involve gathering all the data on a single machine as well.

Thanks,
Raghav

On Tuesday, June 9, 2015, Mark Hamstra m...@clearstorydata.com wrote:

 Correct.  Trading away scalability for increased performance is not an
 option for the standard Spark API.

 On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com
 javascript:_e(%7B%7D,'cvml','daniel.dara...@lynxanalytics.com'); wrote:

 It would be even faster to load the data on the driver and sort it there
 without using Spark :). Using reduce() is cheating, because it only works
 as long as the data fits on one machine. That is not the targeted use case
 of a distributed computation system. You can repeat your test with more
 data (that doesn't fit on one machine) to see what I mean.

 On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote:

 For a research project, I tried sorting the elements in an RDD. I did
 this in
 two different approaches.

 In the first method, I applied a mapPartitions() function on the RDD, so
 that it would sort the contents of the RDD, and provide a result RDD that
 contains the sorted list as the only record in the RDD. Then, I applied a
 reduce function which basically merges sorted lists.

 I ran these experiments on an EC2 cluster containing 30 nodes. I set it
 up
 using the spark ec2 script. The data file was stored in HDFS.

 In the second approach I used the sortBy method in Spark.

 I performed these operation on the US census data(100MB) found here

 A single lines looks like this

 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married,
 Not
 in universe or children, Not in universe, White, All other, Female, Not
 in
 universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler,
 Not
 in universe, Not in universe, Child 18 never marr not in subfamily,
 Child
 under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not
 in
 universe, 0, Both parents present, United-States, United-States,
 United-States, Native- Born in the United States, 0, Not in universe, 0,
 0,
 94, - 5.
 I sorted based on the 25th value in the CSV. In this line that is
 1758.14.

 I noticed that sortBy performs worse than the other method. Is this the
 expected scenario? If it is, why wouldn't the mapPartitions() and
 reduce()
 be the default sorting approach?

 Here is my implementation

 public static void sortBy(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 long start = System.currentTimeMillis();
 rdd.sortBy(new FunctionString, Double(){

 @Override
 public Double call(String v1) throws Exception {
   // TODO Auto-generated method stub
   String [] arr = v1.split(,);
   return Double.parseDouble(arr[24]);
 }
 }, true, 9).collect();
 long end = System.currentTimeMillis();
 System.out.println(SortBy:  + (end - start));
   }

 public static void sortList(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 //parallelize(l,
 8);
 long start = System.currentTimeMillis();
 JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 =
 rdd.mapPartitions(new FlatMapFunctionIteratorlt;String,
 LinkedListTuple2lt;Double, String(){

 @Override
 public IterableLinkedListlt;Tuple2lt;Double, String
 call(IteratorString t)
 throws Exception {
   // TODO Auto-generated method stub
   LinkedListTuple2lt;Double, String lines = new
 LinkedListTuple2lt;Double, String();
   while(t.hasNext()){
 String s = t.next();
 String arr1[] = s.split(,);
 Tuple2Double, String t1 = new Tuple2Double,
 String(Double.parseDouble(arr1[24]),s);
 lines.add(t1);
   }
   Collections.sort(lines, new IncomeComparator());
   LinkedListLinkedListlt;Tuple2lt;Double, String list = new
 LinkedListLinkedListlt;Tuple2lt;Double, String();
   list.add(lines);
   return list;
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 

Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey Reza,

Thanks for your response!

Your response clarifies some of my initial thoughts. However, what I don't
understand is how the depth of the tree is used to identify how many
intermediate reducers there will be, and how many partitions are sent to
the intermediate reducers. Could you provide some insight into this?

Thanks,
Raghav

On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote:

 In a regular reduce, all partitions have to send their reduced value to a
 single machine, and that machine can become a bottleneck.

 In a treeReduce, the partitions talk to each other in a logarithmic number
 of rounds. Imagine a binary tree that has all the partitions at its leaves
 and the root will contain the final reduced value. This way there is no
 single bottleneck machine.

 It remains to decide the number of children each node should have and how
 deep the tree should be, which is some of the logic in the method you
 pasted.

 On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote:

 I am trying to understand what the treeReduce function for an RDD does,
 and
 how it is different from the normal reduce function. My current
 understanding is that treeReduce tries to split up the reduce into
 multiple
 steps. We do a partial reduce on different nodes, and then a final reduce
 is
 done to get the final result. Is this correct? If so, I guess what I am
 curious about is, how does spark decide how many nodes will be on each
 level, and how many partitions will be sent to a given node?

 The bulk of the implementation is within this function:

 partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
   .getOrElse(throw new UnsupportedOperationException(empty
 collection))

 The above function is expanded to

 val cleanSeqOp = context.clean(seqOp)
   val cleanCombOp = context.clean(combOp)
   val aggregatePartition =
 (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
 cleanCombOp)
   var partiallyAggregated = mapPartitions(it =
 Iterator(aggregatePartition(it)))
   var numPartitions = partiallyAggregated.partitions.length
   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
 depth)).toInt, 2)
   // If creating an extra level doesn't help reduce
   // the wall-clock time, we stop tree aggregation.
   while (numPartitions  scale + numPartitions / scale) {
 numPartitions /= scale
 val curNumPartitions = numPartitions
 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
   (i, iter) = iter.map((i % curNumPartitions, _))
 }.reduceByKey(new HashPartitioner(curNumPartitions),
 cleanCombOp).values
   }
   partiallyAggregated.reduce(cleanCombOp)

 I am completely lost about what is happening in this function. I would
 greatly appreciate some sort of explanation.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org');
 For additional commands, e-mail: user-h...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');





Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey DB,

Thanks for the reply!

I still don't think this answers my question. For example, if I have a
top() action being executed and I have 32 workers(32 partitions), and I
choose a depth of 4, what does the overlay of intermediate reducers look
like? How many reducers are there excluding the master and the worker? How
many partitions get sent to each of these intermediate reducers? Does this
number vary at each level?

Thanks!

On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote:

 By default, the depth of the tree is 2. Each partition will be one node.

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com


 On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com
 javascript:; wrote:
  Hey Reza,
 
  Thanks for your response!
 
  Your response clarifies some of my initial thoughts. However, what I
 don't
  understand is how the depth of the tree is used to identify how many
  intermediate reducers there will be, and how many partitions are sent to
 the
  intermediate reducers. Could you provide some insight into this?
 
  Thanks,
  Raghav
 
  On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com
 javascript:; wrote:
 
  In a regular reduce, all partitions have to send their reduced value to
 a
  single machine, and that machine can become a bottleneck.
 
  In a treeReduce, the partitions talk to each other in a logarithmic
 number
  of rounds. Imagine a binary tree that has all the partitions at its
 leaves
  and the root will contain the final reduced value. This way there is no
  single bottleneck machine.
 
  It remains to decide the number of children each node should have and
 how
  deep the tree should be, which is some of the logic in the method you
  pasted.
 
  On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com
 javascript:; wrote:
 
  I am trying to understand what the treeReduce function for an RDD does,
  and
  how it is different from the normal reduce function. My current
  understanding is that treeReduce tries to split up the reduce into
  multiple
  steps. We do a partial reduce on different nodes, and then a final
 reduce
  is
  done to get the final result. Is this correct? If so, I guess what I am
  curious about is, how does spark decide how many nodes will be on each
  level, and how many partitions will be sent to a given node?
 
  The bulk of the implementation is within this function:
 
  partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException(empty
  collection))
 
  The above function is expanded to
 
  val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
  (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
  cleanCombOp)
var partiallyAggregated = mapPartitions(it =
  Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
  depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
while (numPartitions  scale + numPartitions / scale) {
  numPartitions /= scale
  val curNumPartitions = numPartitions
  partiallyAggregated =
 partiallyAggregated.mapPartitionsWithIndex
  {
(i, iter) = iter.map((i % curNumPartitions, _))
  }.reduceByKey(new HashPartitioner(curNumPartitions),
  cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
 
  I am completely lost about what is happening in this function. I would
  greatly appreciate some sort of explanation.
 
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 javascript:;
  For additional commands, e-mail: user-h...@spark.apache.org
 javascript:;
 
 
 



Re: Task result in Spark Worker Node

2015-04-17 Thread Raghav Shankar
Hey Imran, 

 Thanks for the great explanation! This cleared up a lot of things for me. I am 
actually trying to utilize some of the features within Spark for a system I am 
developing. I am currently working on developing a subsystem that can be 
integrated within Spark and other Big Data solutions. In order to integrate it 
within Spark, I am trying to utilize the rdds and functions provided to the 
reduce method on my system. My system is developed in Scala and Java. In Spark, 
I have seen that the function provided to the reduce method, along with the 
RDD, gets serialized and sent to the worker nodes. The worker nodes are able to 
deserialize them and then execute the task on them. I see this happening in 
ResultTask.scala. When I try to do something similar, I get exceptions. The 
system I am developing has Spark jars in its build path, so it is able to 
create a SparkContext etc. 

When I do, 

val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar 
to DAGScheduler.scala)
val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) = Int)](
  ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));

I get the proper result and can print it out. 

But when I involve the network by serializing the data, using the network to 
send it to a different program, then deserialize the data and use the function, 
I get the following error:

Exception in thread main java.lang.NullPointerException
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
at SimpleApp$.net(SimpleApp.scala:71)
at SimpleApp$.main(SimpleApp.scala:76)
at SimpleApp.main(SimpleApp.scala)

I have also made sure that I am adding the class file of the program that is 
sending the serialized data to the bin folder of the program that is receiving 
the data. I’m not sure what I am doing wrong. I’ve done the serialization and 
creation of the function similar to how Spark does it. I created another reduce 
function like this. When implemented this way, it prints out the result of 
func2 properly. But when I involve the network by sending the serialized data 
to another program, I get the above exception. 

   def reduceMod(f: (Integer, Integer) = Integer): Integer = {
val reducePartition: Iterator[Integer] = Option[Integer] = iter = {
  if (iter.hasNext) {
Some(iter.reduceLeft(f))
  } else {
None
  }
}
val processFunc = (context: TaskContext, iter: Iterator[Integer]) = 
reducePartition(iter)
val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) = Int]
context = new TaskContextImpl(stageId = 1, partitionId = 1,
  taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
println(func.getClass.getName);
println(func(context, rdd.iterator(rdd.partitions(1), context)));
val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) = Int)](
  ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
1
  }
 
I was wondering if you had any ideas on what I am doing wrong, or how I can 
properly send the serialized version of the RDD and function to my other 
program. My thought is that I might need to add more jars to the build path, 
but I have no clue if thats the issue and what jars I need to add. 

Thanks,
Raghav

 On Apr 13, 2015, at 10:22 PM, Imran Rashid iras...@cloudera.com wrote:
 
 On the worker side, it all happens in Executor.  The task result is computed 
 here:
 
 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
  
 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
 
 then its serialized along with some other goodies, and finally sent back to 
 the driver here:
 
 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255
  
 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255
 
 What happens on the driver is quite a bit more complicated, and involves a 
 number of spots in the code, but at least to get you started, the results are 
 received here:
 
 

Re: Task result in Spark Worker Node

2015-04-17 Thread Raghav Shankar
My apologies, I had pasted the wrong exception trace in the previous email. 
Here is the actual exception that I am receiving. 

Exception in thread main java.lang.NullPointerException
at 
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154)
at 
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 On Apr 17, 2015, at 2:30 AM, Raghav Shankar raghav0110...@gmail.com wrote:
 
 Hey Imran, 
 
  Thanks for the great explanation! This cleared up a lot of things for me. I 
 am actually trying to utilize some of the features within Spark for a system 
 I am developing. I am currently working on developing a subsystem that can be 
 integrated within Spark and other Big Data solutions. In order to integrate 
 it within Spark, I am trying to utilize the rdds and functions provided to 
 the reduce method on my system. My system is developed in Scala and Java. In 
 Spark, I have seen that the function provided to the reduce method, along 
 with the RDD, gets serialized and sent to the worker nodes. The worker nodes 
 are able to deserialize them and then execute the task on them. I see this 
 happening in ResultTask.scala. When I try to do something similar, I get 
 exceptions. The system I am developing has Spark jars in its build path, so 
 it is able to create a SparkContext etc. 
 
 When I do, 
 
 val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() 
 (similar to DAGScheduler.scala)
 val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
 Iterator[Int]) = Int)](
   ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
 println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));
 
 I get the proper result and can print it out. 
 
 But when I involve the network by serializing the data, using the network to 
 send it to a different program, then deserialize the data and use the 
 function, I get the following error:
 
 Exception in thread main java.lang.NullPointerException
   at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
   at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
   at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
   at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
   at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
   at SimpleApp$.net(SimpleApp.scala:71)
   at SimpleApp$.main(SimpleApp.scala:76)
   at SimpleApp.main(SimpleApp.scala)
 
 I have also made sure that I am adding the class file of the program that is 
 sending the serialized data to the bin folder of the program that is 
 receiving the data. I’m not sure what I am doing wrong. I’ve done the 
 serialization and creation of the function similar to how Spark does it. I 
 created another reduce function like this. When implemented this way, it 
 prints out the result of func2 properly. But when I involve the network by 
 sending the serialized data to another program, I get the above exception. 
 
def reduceMod(f: (Integer, Integer) = Integer): Integer = {
 val reducePartition: Iterator[Integer] = Option[Integer] = iter = {
   if (iter.hasNext) {
 Some(iter.reduceLeft(f))
   } else {
 None
   }
 }
 val processFunc = (context: TaskContext, iter: Iterator[Integer]) = 
 reducePartition(iter)
 val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) = Int]
 context = new TaskContextImpl(stageId = 1, partitionId = 1,
   taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
 println(func.getClass.getName);
 println(func(context, rdd.iterator(rdd.partitions(1), context)));
 val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
 val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], 
 (TaskContext, Iterator[Int]) = Int)](
   ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
 println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
 1
   }
  
 I was wondering if you had any ideas on what I am doing wrong, or how I can 
 properly send the serialized version of the RDD and function to my other 
 program. My thought is that I might need to add more jars to the build path, 
 but I have no clue if thats the issue and what jars I need to add. 
 
 Thanks,
 Raghav
 
 On Apr 13, 2015, at 10:22 PM, Imran Rashid iras...@cloudera.com 
 mailto:iras...@cloudera.com wrote:
 
 On the worker side, it all happens in Executor.  The task result is computed 
 here:
 
 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
  
 https://github.com/apache/spark/blob

Re: Sending RDD object over the network

2015-04-06 Thread Raghav Shankar
Hey Akhil,

 Thanks for your response! No, I am not expecting to receive the values
themselves. I am just trying to receive the RDD object on my second Spark
application. However, I get a NPE when I try to use the object within my
second program. Would you know how I can properly send the RDD object to my
second program?

Thanks,
Raghav

On Mon, Apr 6, 2015 at 3:08 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Are you expecting to receive 1 to 100 values in your second program?

 RDD is just an abstraction, you would need to do like:

 num.foreach(x = send(x))


 Thanks
 Best Regards

 On Mon, Apr 6, 2015 at 1:56 AM, raggy raghav0110...@gmail.com wrote:

 For a class project, I am trying to utilize 2 spark Applications
 communicate
 with each other by passing an RDD object that was created from one
 application to another Spark application. The first application is
 developed
 in Scala and creates an RDD and sends it to the 2nd application over the
 network as follows:

 val logFile = ../../spark-1.3.0/README.md // Should be some file on
 your system
 val conf = new SparkConf();
 conf.setAppName(Simple Application).setMaster(local[2])
 val sc = new SparkContext(conf)
 val nums = sc.parallelize(1 to 100, 2).toJavaRDD();
 val s = new Socket(127.0.0.1, 8000);
 val objectOutput = new ObjectOutputStream(s.getOutputStream());
 objectOutput.writeObject(nums);
 s.close();
 The second Spark application is a Java application, which tries to receive
 the RDD object and then perform some operations on it. At the moment, I am
 trying to see if I have properly obtained the object.

 ServerSocket listener = null;
 Socket client;

 try{
 listener = new ServerSocket(8000);
 }catch(Exception e){
 e.printStackTrace();
 }
 System.out.println(Listening);
 try{
 client = listener.accept();
 ObjectInputStream objectInput = new
 ObjectInputStream(client.getInputStream());
 Object object =(JavaRDD) objectInput.readObject();
 JavaRDD tmp = (JavaRDD) object;

 if(tmp != null){
 System.out.println(tmp.getStorageLevel().toString());
 ListPartition p = tmp.partitions();
 }
 else{
 System.out.println(variable is null);
 }

 }catch(Exception e){
 e.printStackTrace();
 }
 The output I get is:

 StorageLevel(false, false, false, false, 1)
 java.lang.NullPointerException
 at

 org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154)
 at

 org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at

 org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56)
 at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
 at SimpleApp.main(SimpleApp.java:35)
 So, System.out.println(tmp.getStorageLevel().toString()); prints out
 properly. But, ListPartition p = tmp.partitions(); throws the
 NullPointerException. I can't seem to figure out why this is happening.

 In a nutshell, I am basically trying to create an RDD object in one Spark
 application and then send the object to another application. After
 receiving
 the object I try to make sure I received it properly by accessing its
 methods. Invoking the partitions() method in the original Spark
 application
 does not throw any errors either. I would greatly appreciate any
 suggestion
 on how I can solve my problem, or an alternative solution for what I am
 trying to accomplish.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sending-RDD-object-over-the-network-tp22382.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org