Re: Submitting Spark Applications using Spark Submit
) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Also, in the above error it says: connection refused to ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077 I don’t understand where it gets the 10.165.103.16 http://ec2-xxx.compute-1.amazonaws.com/10.165.103.16:7077 from. I never specify that in the master url command line parameter. Any ideas on what I might be doing wrong? On Jun 19, 2015, at 7:19 PM, Andrew Or and...@databricks.com wrote: Hi Raghav, I'm assuming you're using standalone mode. When using the Spark EC2 scripts you need to make sure that every machine has the most updated jars. Once you have built on one of the nodes, you must rsync the Spark directory to the rest of the nodes (see /root/spark-ec2/copy-dir). That said, I usually build it locally on my laptop and scp the assembly jar to the cluster instead of building it there. The EC2 machines often take much longer to build for some reason. Also it's cumbersome to set up proper IDE there. -Andrew 2015-06-19 19:11 GMT-07:00 Raghav Shankar raghav0110...@gmail.com: Thanks Andrew! Is this all I have to do when using the spark ec2 script to setup a spark cluster? It seems to be getting an assembly jar that is not from my project(perhaps from a maven repo). Is there a way to make the ec2 script use the assembly jar that I created? Thanks, Raghav On Friday, June 19, 2015, Andrew Or and...@databricks.com wrote: Hi Raghav, If you want to make changes to Spark and run your application with it, you may follow these steps. 1. git clone g...@github.com:apache/spark 2. cd spark; build/mvn clean package -DskipTests [...] 3. make local changes 4. build/mvn package -DskipTests [...] (no need to clean again here) 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar No need to pass in extra --driver-java-options or --driver-extra-classpath as others have suggested. When using spark-submit, the main jar comes from assembly/target/scala_2.10, which is prepared through mvn package. You just have to make sure that you re-package the assembly jar after each modification. -Andrew 2015-06-18 16:35 GMT-07:00 maxdml max...@cs.duke.edu: You can specify the jars of your application to be included with spark-submit with the /--jars/ switch. Otherwise, are you sure that your newly compiled spark jar assembly is in assembly/target/scala-2.10/? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
Thanks Andrew! Is this all I have to do when using the spark ec2 script to setup a spark cluster? It seems to be getting an assembly jar that is not from my project(perhaps from a maven repo). Is there a way to make the ec2 script use the assembly jar that I created? Thanks, Raghav On Friday, June 19, 2015, Andrew Or and...@databricks.com wrote: Hi Raghav, If you want to make changes to Spark and run your application with it, you may follow these steps. 1. git clone g...@github.com:apache/spark 2. cd spark; build/mvn clean package -DskipTests [...] 3. make local changes 4. build/mvn package -DskipTests [...] (no need to clean again here) 5. bin/spark-submit --master spark://[...] --class your.main.class your.jar No need to pass in extra --driver-java-options or --driver-extra-classpath as others have suggested. When using spark-submit, the main jar comes from assembly/target/scala_2.10, which is prepared through mvn package. You just have to make sure that you re-package the assembly jar after each modification. -Andrew 2015-06-18 16:35 GMT-07:00 maxdml max...@cs.duke.edu javascript:_e(%7B%7D,'cvml','max...@cs.duke.edu');: You can specify the jars of your application to be included with spark-submit with the /--jars/ switch. Otherwise, are you sure that your newly compiled spark jar assembly is in assembly/target/scala-2.10/? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org'); For additional commands, e-mail: user-h...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');
Re: Implementing top() using treeReduce()
I’ve implemented this in the suggested manner. When I build Spark and attach the new spark-core jar to my eclipse project, I am able to use the new method. In order to conduct the experiments I need to launch my app on a cluster. I am using EC2. When I setup my master and slaves using the EC2 setup scripts, it sets up spark, but I think my custom built spark-core jar is not being used. How do it up on EC2 so that my custom version of Spark-core is used? Thanks, Raghav On Jun 9, 2015, at 7:41 PM, DB Tsai dbt...@dbtsai.com wrote: Having the following code in RDD.scala works for me. PS, in the following code, I merge the smaller queue into larger one. I wonder if this will help performance. Let me know when you do the benchmark. def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items = // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { Array.empty } else { mapRDDs.treeReduce { (queue1, queue2) = if (queue1.size queue2.size) { queue1 ++= queue2 queue1 } else { queue2 ++= queue1 queue2 } }.toArray.sorted(ord) } } } def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { treeTakeOrdered(num)(ord.reverse) } Sincerely, DB Tsai -- Blog: https://www.dbtsai.com https://www.dbtsai.com/ PGP Key ID: 0xAF08DF8D https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D On Tue, Jun 9, 2015 at 10:09 AM, raggy raghav0110...@gmail.com mailto:raghav0110...@gmail.com wrote: I am trying to implement top-k in scala within apache spark. I am aware that spark has a top action. But, top() uses reduce(). Instead, I would like to use treeReduce(). I am trying to compare the performance of reduce() and treeReduce(). The main issue I have is that I cannot use these 2 lines of code which are used in the top() action within my Spark application. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) How can I go about implementing top() using treeReduce()? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Implementing top() using treeReduce()
So, I would add the assembly jar to the just the master or would I have to add it to all the slaves/workers too? Thanks, Raghav On Jun 17, 2015, at 5:13 PM, DB Tsai dbt...@dbtsai.com wrote: You need to build the spark assembly with your modification and deploy into cluster. Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Wed, Jun 17, 2015 at 5:11 PM, Raghav Shankar raghav0110...@gmail.com wrote: I’ve implemented this in the suggested manner. When I build Spark and attach the new spark-core jar to my eclipse project, I am able to use the new method. In order to conduct the experiments I need to launch my app on a cluster. I am using EC2. When I setup my master and slaves using the EC2 setup scripts, it sets up spark, but I think my custom built spark-core jar is not being used. How do it up on EC2 so that my custom version of Spark-core is used? Thanks, Raghav On Jun 9, 2015, at 7:41 PM, DB Tsai dbt...@dbtsai.com wrote: Having the following code in RDD.scala works for me. PS, in the following code, I merge the smaller queue into larger one. I wonder if this will help performance. Let me know when you do the benchmark. def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items = // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { Array.empty } else { mapRDDs.treeReduce { (queue1, queue2) = if (queue1.size queue2.size) { queue1 ++= queue2 queue1 } else { queue2 ++= queue1 queue2 } }.toArray.sorted(ord) } } } def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { treeTakeOrdered(num)(ord.reverse) } Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Tue, Jun 9, 2015 at 10:09 AM, raggy raghav0110...@gmail.com wrote: I am trying to implement top-k in scala within apache spark. I am aware that spark has a top action. But, top() uses reduce(). Instead, I would like to use treeReduce(). I am trying to compare the performance of reduce() and treeReduce(). The main issue I have is that I cannot use these 2 lines of code which are used in the top() action within my Spark application. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) How can I go about implementing top() using treeReduce()? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
To clarify, I am using the spark standalone cluster. On Tuesday, June 16, 2015, Yanbo Liang yblia...@gmail.com wrote: If you run Spark on YARN, the simplest way is replace the $SPARK_HOME/lib/spark-.jar with your own version spark jar file and run your application. The spark-submit script will upload this jar to YARN cluster automatically and then you can run your application as usual. It does not care about which version of Spark in your YARN cluster. 2015-06-17 10:42 GMT+08:00 Raghav Shankar raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com');: The documentation says spark.driver.userClassPathFirst can only be used in cluster mode. Does this mean I have to set the --deploy-mode option for spark-submit to cluster? Or can I still use the default client? My understanding is that even in the default deploy mode, spark still uses the slave machines I have on ec2. Also, the spark.driver.extraLibraryPath property mentions that I can provide a path for special libraries on the spark-submit command line options. Do my jar files in this path have to be the same name as the jar used by spark, or is it intelligent enough to identify that two jars are supposed to be the same thing? If they are supposed to be the same name, how can I find out the name I should use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and put in a lib folder and provide the path of the folder to spark-submit would that be enough to tell Spark to use that spark-core jar instead of the default? Thanks, Raghav On Jun 16, 2015, at 7:19 PM, Will Briggs wrbri...@gmail.com javascript:_e(%7B%7D,'cvml','wrbri...@gmail.com'); wrote: If this is research-only, and you don't want to have to worry about updating the jars installed by default on the cluster, you can add your custom Spark jar using the spark.driver.extraLibraryPath configuration property when running spark-submit, and then use the experimental spark.driver.userClassPathFirst config to force it to use yours. See here for more details and options: https://spark.apache.org/docs/1.4.0/configuration.html On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com javascript:_e(%7B%7D,'cvml','wrbri...@gmail.com'); wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List
Re: Submitting Spark Applications using Spark Submit
I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
The documentation says spark.driver.userClassPathFirst can only be used in cluster mode. Does this mean I have to set the --deploy-mode option for spark-submit to cluster? Or can I still use the default client? My understanding is that even in the default deploy mode, spark still uses the slave machines I have on ec2. Also, the spark.driver.extraLibraryPath property mentions that I can provide a path for special libraries on the spark-submit command line options. Do my jar files in this path have to be the same name as the jar used by spark, or is it intelligent enough to identify that two jars are supposed to be the same thing? If they are supposed to be the same name, how can I find out the name I should use for my jar? Eg: If I just name my modified spark-core jar as spark.jar and put in a lib folder and provide the path of the folder to spark-submit would that be enough to tell Spark to use that spark-core jar instead of the default? Thanks, Raghav On Jun 16, 2015, at 7:19 PM, Will Briggs wrbri...@gmail.com wrote: If this is research-only, and you don't want to have to worry about updating the jars installed by default on the cluster, you can add your custom Spark jar using the spark.driver.extraLibraryPath configuration property when running spark-submit, and then use the experimental spark.driver.userClassPathFirst config to force it to use yours. See here for more details and options: https://spark.apache.org/docs/1.4.0/configuration.html On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com wrote: I made the change so that I could implement top() using treeReduce(). A member on here suggested I make the change in RDD.scala to accomplish that. Also, this is for a research project, and not for commercial use. So, any advice on how I can get the spark submit to use my custom built jars would be very useful. Thanks, Raghav On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote: In general, you should avoid making direct changes to the Spark source code. If you are using Scala, you can seamlessly blend your own methods on top of the base RDDs using implicit conversions. Regards, Will On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote: I am trying to submit a spark application using the command line. I used the spark submit command for doing so. I initially setup my Spark application on Eclipse and have been making changes on there. I recently obtained my own version of the Spark source code and added a new method to RDD.scala. I created a new spark core jar using mvn, and I added it to my eclipse build path. My application ran perfectly fine. Now, I would like to submit it through the command line. I submitted my application like this: bin/spark-submit --master local[2] --class SimpleApp /Users/XXX/Desktop/spark2.jar The spark-submit command is within the spark project that I modified by adding new methods. When I do so, I get this error: java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object; at SimpleApp$.main(SimpleApp.scala:12) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) When I use spark submit, where does the jar come from? How do I make sure it uses the jars that have built? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different Sorting RDD methods in Apache Spark
Thank you for you responses! You mention that it only works as long as the data fits on a single machine. What I am tying to do is receive the sorted contents of my dataset. For this to be possible, the entire dataset should be able to fit on a single machine. Are you saying that sorting the entire data and collecting it on the driver node is not a typical use case? If I want to do this using sortBy(), I would first call sortBy() followed by a collect(). Collect() would involve gathering all the data on a single machine as well. Thanks, Raghav On Tuesday, June 9, 2015, Mark Hamstra m...@clearstorydata.com wrote: Correct. Trading away scalability for increased performance is not an option for the standard Spark API. On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos daniel.dara...@lynxanalytics.com javascript:_e(%7B%7D,'cvml','daniel.dara...@lynxanalytics.com'); wrote: It would be even faster to load the data on the driver and sort it there without using Spark :). Using reduce() is cheating, because it only works as long as the data fits on one machine. That is not the targeted use case of a distributed computation system. You can repeat your test with more data (that doesn't fit on one machine) to see what I mean. On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: For a research project, I tried sorting the elements in an RDD. I did this in two different approaches. In the first method, I applied a mapPartitions() function on the RDD, so that it would sort the contents of the RDD, and provide a result RDD that contains the sorted list as the only record in the RDD. Then, I applied a reduce function which basically merges sorted lists. I ran these experiments on an EC2 cluster containing 30 nodes. I set it up using the spark ec2 script. The data file was stored in HDFS. In the second approach I used the sortBy method in Spark. I performed these operation on the US census data(100MB) found here A single lines looks like this 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child 18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 5. I sorted based on the 25th value in the CSV. In this line that is 1758.14. I noticed that sortBy performs worse than the other method. Is this the expected scenario? If it is, why wouldn't the mapPartitions() and reduce() be the default sorting approach? Here is my implementation public static void sortBy(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); long start = System.currentTimeMillis(); rdd.sortBy(new FunctionString, Double(){ @Override public Double call(String v1) throws Exception { // TODO Auto-generated method stub String [] arr = v1.split(,); return Double.parseDouble(arr[24]); } }, true, 9).collect(); long end = System.currentTimeMillis(); System.out.println(SortBy: + (end - start)); } public static void sortList(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l, 8); long start = System.currentTimeMillis(); JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 = rdd.mapPartitions(new FlatMapFunctionIteratorlt;String, LinkedListTuple2lt;Double, String(){ @Override public IterableLinkedListlt;Tuple2lt;Double, String call(IteratorString t) throws Exception { // TODO Auto-generated method stub LinkedListTuple2lt;Double, String lines = new LinkedListTuple2lt;Double, String(); while(t.hasNext()){ String s = t.next(); String arr1[] = s.split(,); Tuple2Double, String t1 = new Tuple2Double, String(Double.parseDouble(arr1[24]),s); lines.add(t1); } Collections.sort(lines, new IncomeComparator()); LinkedListLinkedListlt;Tuple2lt;Double, String list = new LinkedListLinkedListlt;Tuple2lt;Double, String(); list.add(lines); return list; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
Re: TreeReduce Functionality in Spark
Hey Reza, Thanks for your response! Your response clarifies some of my initial thoughts. However, what I don't understand is how the depth of the tree is used to identify how many intermediate reducers there will be, and how many partitions are sent to the intermediate reducers. Could you provide some insight into this? Thanks, Raghav On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote: In a regular reduce, all partitions have to send their reduced value to a single machine, and that machine can become a bottleneck. In a treeReduce, the partitions talk to each other in a logarithmic number of rounds. Imagine a binary tree that has all the partitions at its leaves and the root will contain the final reduced value. This way there is no single bottleneck machine. It remains to decide the number of children each node should have and how deep the tree should be, which is some of the logic in the method you pasted. On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: I am trying to understand what the treeReduce function for an RDD does, and how it is different from the normal reduce function. My current understanding is that treeReduce tries to split up the reduce into multiple steps. We do a partial reduce on different nodes, and then a final reduce is done to get the final result. Is this correct? If so, I guess what I am curious about is, how does spark decide how many nodes will be on each level, and how many partitions will be sent to a given node? The bulk of the implementation is within this function: partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException(empty collection)) The above function is expanded to val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions scale + numPartitions / scale) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) = iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) I am completely lost about what is happening in this function. I would greatly appreciate some sort of explanation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org'); For additional commands, e-mail: user-h...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');
Re: TreeReduce Functionality in Spark
Hey DB, Thanks for the reply! I still don't think this answers my question. For example, if I have a top() action being executed and I have 32 workers(32 partitions), and I choose a depth of 4, what does the overlay of intermediate reducers look like? How many reducers are there excluding the master and the worker? How many partitions get sent to each of these intermediate reducers? Does this number vary at each level? Thanks! On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote: By default, the depth of the tree is 2. Each partition will be one node. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com javascript:; wrote: Hey Reza, Thanks for your response! Your response clarifies some of my initial thoughts. However, what I don't understand is how the depth of the tree is used to identify how many intermediate reducers there will be, and how many partitions are sent to the intermediate reducers. Could you provide some insight into this? Thanks, Raghav On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com javascript:; wrote: In a regular reduce, all partitions have to send their reduced value to a single machine, and that machine can become a bottleneck. In a treeReduce, the partitions talk to each other in a logarithmic number of rounds. Imagine a binary tree that has all the partitions at its leaves and the root will contain the final reduced value. This way there is no single bottleneck machine. It remains to decide the number of children each node should have and how deep the tree should be, which is some of the logic in the method you pasted. On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com javascript:; wrote: I am trying to understand what the treeReduce function for an RDD does, and how it is different from the normal reduce function. My current understanding is that treeReduce tries to split up the reduce into multiple steps. We do a partial reduce on different nodes, and then a final reduce is done to get the final result. Is this correct? If so, I guess what I am curious about is, how does spark decide how many nodes will be on each level, and how many partitions will be sent to a given node? The bulk of the implementation is within this function: partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException(empty collection)) The above function is expanded to val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions scale + numPartitions / scale) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) = iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) I am completely lost about what is happening in this function. I would greatly appreciate some sort of explanation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
Re: Task result in Spark Worker Node
Hey Imran, Thanks for the great explanation! This cleared up a lot of things for me. I am actually trying to utilize some of the features within Spark for a system I am developing. I am currently working on developing a subsystem that can be integrated within Spark and other Big Data solutions. In order to integrate it within Spark, I am trying to utilize the rdds and functions provided to the reduce method on my system. My system is developed in Scala and Java. In Spark, I have seen that the function provided to the reduce method, along with the RDD, gets serialized and sent to the worker nodes. The worker nodes are able to deserialize them and then execute the task on them. I see this happening in ResultTask.scala. When I try to do something similar, I get exceptions. The system I am developing has Spark jars in its build path, so it is able to create a SparkContext etc. When I do, val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar to DAGScheduler.scala) val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader) println(func2(context, rdd2.iterator(rdd2.partitions(1), context))); I get the proper result and can print it out. But when I involve the network by serializing the data, using the network to send it to a different program, then deserialize the data and use the function, I get the following error: Exception in thread main java.lang.NullPointerException at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$.net(SimpleApp.scala:71) at SimpleApp$.main(SimpleApp.scala:76) at SimpleApp.main(SimpleApp.scala) I have also made sure that I am adding the class file of the program that is sending the serialized data to the bin folder of the program that is receiving the data. I’m not sure what I am doing wrong. I’ve done the serialization and creation of the function similar to how Spark does it. I created another reduce function like this. When implemented this way, it prints out the result of func2 properly. But when I involve the network by sending the serialized data to another program, I get the above exception. def reduceMod(f: (Integer, Integer) = Integer): Integer = { val reducePartition: Iterator[Integer] = Option[Integer] = iter = { if (iter.hasNext) { Some(iter.reduceLeft(f)) } else { None } } val processFunc = (context: TaskContext, iter: Iterator[Integer]) = reducePartition(iter) val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) = Int] context = new TaskContextImpl(stageId = 1, partitionId = 1, taskAttemptId = 1, attemptNumber = 1, runningLocally = false) println(func.getClass.getName); println(func(context, rdd.iterator(rdd.partitions(1), context))); val bb = closureSerializer.serialize((rdd, func) : AnyRef).array() val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader) println(func2(context, rdd3.iterator(rdd3.partitions(1), context))); 1 } I was wondering if you had any ideas on what I am doing wrong, or how I can properly send the serialized version of the RDD and function to my other program. My thought is that I might need to add more jars to the build path, but I have no clue if thats the issue and what jars I need to add. Thanks, Raghav On Apr 13, 2015, at 10:22 PM, Imran Rashid iras...@cloudera.com wrote: On the worker side, it all happens in Executor. The task result is computed here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 then its serialized along with some other goodies, and finally sent back to the driver here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 What happens on the driver is quite a bit more complicated, and involves a number of spots in the code, but at least to get you started, the results are received here:
Re: Task result in Spark Worker Node
My apologies, I had pasted the wrong exception trace in the previous email. Here is the actual exception that I am receiving. Exception in thread main java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) On Apr 17, 2015, at 2:30 AM, Raghav Shankar raghav0110...@gmail.com wrote: Hey Imran, Thanks for the great explanation! This cleared up a lot of things for me. I am actually trying to utilize some of the features within Spark for a system I am developing. I am currently working on developing a subsystem that can be integrated within Spark and other Big Data solutions. In order to integrate it within Spark, I am trying to utilize the rdds and functions provided to the reduce method on my system. My system is developed in Scala and Java. In Spark, I have seen that the function provided to the reduce method, along with the RDD, gets serialized and sent to the worker nodes. The worker nodes are able to deserialize them and then execute the task on them. I see this happening in ResultTask.scala. When I try to do something similar, I get exceptions. The system I am developing has Spark jars in its build path, so it is able to create a SparkContext etc. When I do, val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar to DAGScheduler.scala) val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader) println(func2(context, rdd2.iterator(rdd2.partitions(1), context))); I get the proper result and can print it out. But when I involve the network by serializing the data, using the network to send it to a different program, then deserialize the data and use the function, I get the following error: Exception in thread main java.lang.NullPointerException at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$.net(SimpleApp.scala:71) at SimpleApp$.main(SimpleApp.scala:76) at SimpleApp.main(SimpleApp.scala) I have also made sure that I am adding the class file of the program that is sending the serialized data to the bin folder of the program that is receiving the data. I’m not sure what I am doing wrong. I’ve done the serialization and creation of the function similar to how Spark does it. I created another reduce function like this. When implemented this way, it prints out the result of func2 properly. But when I involve the network by sending the serialized data to another program, I get the above exception. def reduceMod(f: (Integer, Integer) = Integer): Integer = { val reducePartition: Iterator[Integer] = Option[Integer] = iter = { if (iter.hasNext) { Some(iter.reduceLeft(f)) } else { None } } val processFunc = (context: TaskContext, iter: Iterator[Integer]) = reducePartition(iter) val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) = Int] context = new TaskContextImpl(stageId = 1, partitionId = 1, taskAttemptId = 1, attemptNumber = 1, runningLocally = false) println(func.getClass.getName); println(func(context, rdd.iterator(rdd.partitions(1), context))); val bb = closureSerializer.serialize((rdd, func) : AnyRef).array() val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader) println(func2(context, rdd3.iterator(rdd3.partitions(1), context))); 1 } I was wondering if you had any ideas on what I am doing wrong, or how I can properly send the serialized version of the RDD and function to my other program. My thought is that I might need to add more jars to the build path, but I have no clue if thats the issue and what jars I need to add. Thanks, Raghav On Apr 13, 2015, at 10:22 PM, Imran Rashid iras...@cloudera.com mailto:iras...@cloudera.com wrote: On the worker side, it all happens in Executor. The task result is computed here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 https://github.com/apache/spark/blob
Re: Sending RDD object over the network
Hey Akhil, Thanks for your response! No, I am not expecting to receive the values themselves. I am just trying to receive the RDD object on my second Spark application. However, I get a NPE when I try to use the object within my second program. Would you know how I can properly send the RDD object to my second program? Thanks, Raghav On Mon, Apr 6, 2015 at 3:08 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you expecting to receive 1 to 100 values in your second program? RDD is just an abstraction, you would need to do like: num.foreach(x = send(x)) Thanks Best Regards On Mon, Apr 6, 2015 at 1:56 AM, raggy raghav0110...@gmail.com wrote: For a class project, I am trying to utilize 2 spark Applications communicate with each other by passing an RDD object that was created from one application to another Spark application. The first application is developed in Scala and creates an RDD and sends it to the 2nd application over the network as follows: val logFile = ../../spark-1.3.0/README.md // Should be some file on your system val conf = new SparkConf(); conf.setAppName(Simple Application).setMaster(local[2]) val sc = new SparkContext(conf) val nums = sc.parallelize(1 to 100, 2).toJavaRDD(); val s = new Socket(127.0.0.1, 8000); val objectOutput = new ObjectOutputStream(s.getOutputStream()); objectOutput.writeObject(nums); s.close(); The second Spark application is a Java application, which tries to receive the RDD object and then perform some operations on it. At the moment, I am trying to see if I have properly obtained the object. ServerSocket listener = null; Socket client; try{ listener = new ServerSocket(8000); }catch(Exception e){ e.printStackTrace(); } System.out.println(Listening); try{ client = listener.accept(); ObjectInputStream objectInput = new ObjectInputStream(client.getInputStream()); Object object =(JavaRDD) objectInput.readObject(); JavaRDD tmp = (JavaRDD) object; if(tmp != null){ System.out.println(tmp.getStorageLevel().toString()); ListPartition p = tmp.partitions(); } else{ System.out.println(variable is null); } }catch(Exception e){ e.printStackTrace(); } The output I get is: StorageLevel(false, false, false, false, 1) java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at SimpleApp.main(SimpleApp.java:35) So, System.out.println(tmp.getStorageLevel().toString()); prints out properly. But, ListPartition p = tmp.partitions(); throws the NullPointerException. I can't seem to figure out why this is happening. In a nutshell, I am basically trying to create an RDD object in one Spark application and then send the object to another application. After receiving the object I try to make sure I received it properly by accessing its methods. Invoking the partitions() method in the original Spark application does not throw any errors either. I would greatly appreciate any suggestion on how I can solve my problem, or an alternative solution for what I am trying to accomplish. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sending-RDD-object-over-the-network-tp22382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org