Re: PMML support in spark
Hi Pranay, I don’t think anyone’s working on this right now, but contributions would be welcome if this is a thing we could plug into MLlib. Matei On Nov 6, 2013, at 8:44 PM, Pranay Tonpay pranay.ton...@impetus.co.in wrote: Hi, Wanted to know if PMML support in Spark is there in the roadmap for Spark… PMML has been of interest with a lot of our customers… Thx pranay NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Re: java.io.NotSerializableException on RDD count() in Java
No problem - thanks for helping us diagnose this! On Tue, Nov 5, 2013 at 5:04 AM, Yadid Ayzenberg ya...@media.mit.edu wrote: Ah, I see. Thanks very much for you assistance Patrick and Reynold. As a workaround for now, I implemented the SC field as transient and its working fine. Yadid On 11/3/13 9:05 PM, Reynold Xin wrote: Yea so every inner class actually contains a field referencing the outer class. In your case, the anonymous class DoubleFlatMapFunction actually has a this$0 field referencing the outer class AnalyticsEngine, and thus why Spark is trying to serialize AnalyticsEngine. In the Scala API, the closure (which is really just implemented as anonymous classes) has a field called $outer, and Spark uses a closure cleaner that goes into the anonymous class to remove the $outer field if it is not used in the closure itself. In Java, the compiler generates a field called this$0, and thus the closure cleaner doesn't find it and can't clean it properly. I will work on a fix for the closure cleaner to clean this up as well. Meantime, you can work around this by either defining your anonymous class as a static class, or mark the JavaSparkContext field as transient. On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell pwend...@gmail.com wrote: Hm, I think you are triggering a bug in the Java API where closures may not be properly cleaned. I think @rxin has reproduced this, deferring to him. - Patrick On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg ya...@media.mit.edu wrote: code is below. in the code rdd.count() works, but rdd2.count() fails. public class AnalyticsEngine implements Serializable { private static AnalyticsEngine engine=null; private JavaSparkContext sc; final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class); private Properties prop; String db_host; private AnalyticsEngine() { System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer); System.setProperty(spark.kryo.registrator, edu.mit.bsense.MyRegistrator); sc = new JavaSparkContext(local[4],TestSpark); Properties prop = new Properties(); try { prop.load(new FileInputStream(config.properties)); db_host = prop.getProperty(database_host1); logger.info(Database host: {}, db_host); } catch (FileNotFoundException ex) { logger.info(Could not read config.properties: + ex.toString()); } catch (IOException ex) { logger.info(Could not read config.properties: + ex.toString()); } public void getData(void) { Configuration conf = new Configuration(); String conf_url = mongodb:// + db_host + /test.data1; //this is the data partition conf.set(mongo.input.uri, conf_url); conf.set(mongo.input.query, {\streamId\:\+13+\},{\data\:1}); conf.set(mongo.input.split_size,64); JavaPairRDDObject,BSONObject rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class); rdd.cache(); logger.info(Count of rdd: {}, rdd.count()); logger.info(==); JavaDoubleRDD rdd2 = rdd.flatMap( new DoubleFlatMapFunctionTuple2Object, BSONObject() { @Override public IterableDouble call(Tuple2Object, BSONObject e) { BSONObject doc = e._2(); BasicDBList vals = (BasicDBList)doc.get(data); ListDouble results = new ArrayListDouble(); for (int i=0; i vals.size();i++ ) results.add((Double)((BasicDBList)vals.get(i)).get(0)); return results; } }); logger.info(Take: {}, rdd2.take(100)); logger.info(Count: {}, rdd2.count()); } } On 11/3/13 8:19 PM, Patrick Wendell wrote: Thanks that would help. This would be consistent with there being a reference to the SparkContext itself inside of the closure. Just want to make sure that's not the case. On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg ya...@media.mit.edu wrote: Im running in local[4] mode - so there are no slave machines. Full stack trace: (run-main) org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at
Spark and geospatial data
Hello, I'm a developer on the GeoTrellis project (http://geotrellis.github.io). We do fast raster processing over large data sets, from web-time (sub-100ms) processing for live endpoints to distributed raster analysis over clusters using Akka clustering. There's currently discussion underway about moving to support a Spark backend for doing large scale distributed raster analysis. You can see the discussion here: https://groups.google.com/forum/#!topic/geotrellis-user/wkUOhFwYAvc. Any contributions to the discussion would be welcome. My question to the list is, is there currently any development towards a geospatial data story for Spark, that is, using Spark for large scale raster\vector spatial data analysis? Is there anyone using Spark currently for this sort of work? Thanks, Rob Emanuele
Re: Spark and geospatial data
Hello Rob, As you may know I have a long experience in Geospatial data, and I'm now investigating Spark... So I'll be very interested further answers but also to participate to going forward on this great idea! For instance, I'd say that implementing classical geospatial algorithms like classification, feature extraction, pyramid generation and so on would be a geo-extension lib to Spark, this would be easier using Geotrellis API. My only question, for now, is that Geotrellis has his own notion of lineage and Spark as well, so maybe some harmonization work will have to be done to serialize and schedule them? Maybe Pickles could help for the serialization part... Sorry If I miss something (or even said stupidities ^^)... I'm going now to the thread you mentioned! Looking forward ;) Cheers andy On Thu, Nov 7, 2013 at 8:49 PM, Rob Emanuele lossy...@gmail.com wrote: Hello, I'm a developer on the GeoTrellis project (http://geotrellis.github.io). We do fast raster processing over large data sets, from web-time (sub-100ms) processing for live endpoints to distributed raster analysis over clusters using Akka clustering. There's currently discussion underway about moving to support a Spark backend for doing large scale distributed raster analysis. You can see the discussion here: https://groups.google.com/forum/#!topic/geotrellis-user/wkUOhFwYAvc. Any contributions to the discussion would be welcome. My question to the list is, is there currently any development towards a geospatial data story for Spark, that is, using Spark for large scale raster\vector spatial data analysis? Is there anyone using Spark currently for this sort of work? Thanks, Rob Emanuele
Re: Spark and geospatial data
Hi Andy, There would be a large architectural design effort if we decided to support Spark, or replace our current internal actor system with Spark. My thoughts are that the Spark DAG would be fully utilized in tracking lineage and scheduling tasks for the Spark backend, while our current Actor system would route operations using it's own mechanisms. There will have to be a lot of thought put into where exactly the API would split between the Spark backend and our own dedicated Actor system backed, and some harmonization would need to happen; we'd love to incorporate a lot of the great ideas Spark has for scheduling tasks, but also remain with a situation where local and high speed use cases did not need to run through unnecessary machinery, for performance in the small scale. This is all in early stages of consideration, so any input in design ideas is very welcome! The aim from the start of a Spark support story would be to implement all GeoTrellis operations that currently support distribution over tiled rasters to be supported in the Spark environment, so Map Algebra operations like Classification would be carried over as a first step. As far as feature extraction and pyramid generation, these are operations that GeoTrellis currently does not have (besides basic vectorization capabilities), as our focus has been more on implementing fast Map Algebra operations, but these would certainly be great additions to any geospatial data analysis library. Thanks for your ideas, and looking forward to your participation. Cheers, Rob On Thu, Nov 7, 2013 at 3:05 PM, andy petrella andy.petre...@gmail.comwrote: Hello Rob, As you may know I have a long experience in Geospatial data, and I'm now investigating Spark... So I'll be very interested further answers but also to participate to going forward on this great idea! For instance, I'd say that implementing classical geospatial algorithms like classification, feature extraction, pyramid generation and so on would be a geo-extension lib to Spark, this would be easier using Geotrellis API. My only question, for now, is that Geotrellis has his own notion of lineage and Spark as well, so maybe some harmonization work will have to be done to serialize and schedule them? Maybe Pickles could help for the serialization part... Sorry If I miss something (or even said stupidities ^^)... I'm going now to the thread you mentioned! Looking forward ;) Cheers andy On Thu, Nov 7, 2013 at 8:49 PM, Rob Emanuele lossy...@gmail.com wrote: Hello, I'm a developer on the GeoTrellis project (http://geotrellis.github.io). We do fast raster processing over large data sets, from web-time (sub-100ms) processing for live endpoints to distributed raster analysis over clusters using Akka clustering. There's currently discussion underway about moving to support a Spark backend for doing large scale distributed raster analysis. You can see the discussion here: https://groups.google.com/forum/#!topic/geotrellis-user/wkUOhFwYAvc. Any contributions to the discussion would be welcome. My question to the list is, is there currently any development towards a geospatial data story for Spark, that is, using Spark for large scale raster\vector spatial data analysis? Is there anyone using Spark currently for this sort of work? Thanks, Rob Emanuele -- Rob Emanuele, GIS Software Engineer Azavea | 340 N 12th St, Ste 402, Philadelphia, PA remanu...@azavea.com | T 215.701.7692 | F 215.925.2663 Web azavea.com http://www.azavea.com/ | Blog azavea.com/blogshttp://www.azavea.com/Blogs | Twitter @azavea http://twitter.com/azavea
Where is reduceByKey?
On the front page http://spark.incubator.apache.org/ of the Spark website there is the following simple word count implementation: file = spark.textFile(hdfs://...) file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey(_ + _) The same code can be found in the Quick Start http://spark.incubator.apache.org/docs/latest/quick-start.html quide. When I follow the steps in my spark-shell (version 0.8.0) it works fine. The reduceByKey method is also shown in the list of transformations http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#transformations in the Spark Programming Guide. The bottom of this list directs the reader to the API docs for the class RDD (this link is broken, BTW). The API docs for RDD http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDD does not list a reduceByKey method for RDD. Also, when I try to compile the above code in a Scala class definition I get the following compile error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(java.lang.String, Int)] I am compiling with maven using the following dependency definition: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.9.3/artifactId version0.8.0-incubating/version /dependency Can someone help me understand why this code works fine from the spark-shell but doesn't seem to exist in the API docs and won't compile? Thanks, Philip
what happens if default parallelism is set to 4x cores?
Will that cause a hit to performance or cause the program to crash? Thanks
Re: Performance drop / unstable in 0.8 release
Hi, I have all the code for the previous 0.8 version. But how I can find out the SNAPSHOT version there? (in project/SparkBuild.scala it just says version := 0.8.0-SNAPSHOT) Best, Wenlei On Wed, Nov 6, 2013 at 12:09 AM, Reynold Xin r...@apache.org wrote: I don't even think task stealing / speculative execution is turned on by default. Do you know what snapshot version you used for 0.8 previously? On Mon, Nov 4, 2013 at 12:03 PM, Wenlei Xie wenlei@gmail.com wrote: Hi, I have some iterative program written in Spark and have been tested under a snapshot version of spark 0.8 before. After I ported it to the 0.8 release version, I see performance drops in large datasets. I am wondering if there is any clue? I monitored the number of partitions on each machine (by looking at DAGScheduler.getCacheLocs). I observed that some machine may have 30 partitions in the previous iteration while only have 10 partitions in the next iterations. This is something I didn't observed in the older version. Thus I am wondering if the release version would do task stealing more aggressively (for a better dynamic load balance?) Thank you! Best Regards, Wenlei -- Wenlei Xie (谢文磊) Department of Computer Science 5132 Upson Hall, Cornell University Ithaca, NY 14853, USA Phone: (607) 255-5577 Email: wenlei@gmail.com
Re: suppressing logging in REPL
It seems that I need to have the log4j.properties file in the current directory So if I launch spark-shell in spark/conf I see that INFO is not displayed. On Thu, Nov 7, 2013 at 2:16 PM, Shay Seng s...@1618labs.com wrote: When is the log4j.properties file read... and how can I verify that it is begin read? Do I need to have the log4j.properties file set before the cluster is launched? I have the following : root@ ~/spark] more ./conf/log4j.properties # Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN But I'm still seeing INFO logging. root@ ~/spark] ./spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 /_/ Using Scala version 2.9.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_45) Initializing interpreter... 13/11/07 22:13:38 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/11/07 22:13:38 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:41012 Creating SparkContext... 13/11/07 22:14:24 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/11/07 22:14:25 INFO spark.SparkEnv: Registering BlockManagerMaster 13/11/07 22:14:25 INFO storage.MemoryStore: MemoryStore started with capacity 3.8 GB. 13/11/07 22:14:25 INFO storage.DiskStore: Created local directory at /mnt/spark/spark-local-20131107221425-152f 13/11/07 22:14:25 INFO storage.DiskStore: Created local directory at /mnt2/spark/spark-local-20131107221425-a692 13/11/07 22:14:25 INFO network.ConnectionManager: Bound socket to port 45595 with id = ConnectionManagerId(ip-10-138-103-193.ap-southeast-1.compute.internal,45595) 13/11/07 22:14:25 INFO storage.BlockManagerMaster: Trying to register BlockManager 13/11/07 22:14:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager ip-10-138-103-193.ap-southeast-1.compute.internal:45595 with 3.8 GB RAM On Wed, Nov 6, 2013 at 7:04 AM, Shay Seng s...@1618labs.com wrote: By the right place you mean in the conf directory right.. I'll give it another try when I relaunch my cluster this morning Weird. When I first modified the file, it looked like it worked, but I can't remember exactly... Than I had a reply hang and I had to ctrl c out of that... After that the lighting started back up. On Nov 6, 2013 12:17 AM, Reynold Xin r...@apache.org wrote: Are you sure you put the log4j file in the right place? I just tried this with your configuration file, and this is what I see: rxin @ rxin-air : /scratch/rxin/incubator-spark ./spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT /_/ Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65) Initializing interpreter... Creating SparkContext... Spark context available as sc. Type in expressions to have them evaluated. Type :help for more information. scala sc.parallelize(1 to 10, 2).count res0: Long = 10 On Tue, Nov 5, 2013 at 2:36 PM, Shay Seng s...@1618labs.com wrote: Hi, I added a log4j.properties file in spark/conf more ./spark/conf/log4j.properties # Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN But yet, when I launch the REPL, I still see INFO logs... what am I missing here? Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 /_/ Using Scala version 2.9.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_45) Initializing interpreter... 13/11/05 22:33:11 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/11/05 22:33:11 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49376 Creating SparkContext... 13/11/05 22:33:21 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/11/05 22:33:21 INFO spark.SparkEnv: Registering BlockManagerMaster 13/11/05 22:33:21 INFO storage.MemoryStore: MemoryStore started with capacity 3.8 GB. 13/11/05 22:33:21 INFO storage.DiskStore: Created local directory at /mnt/spark/spark-local-20131105223321-9086 13/11/05 22:33:21 INFO storage.DiskStore: Created local directory at /mnt2/spark/spark-local-20131105223321-b94d
Spark Summit agenda posted
Hi everyone, We're glad to announce the agenda of the Spark Summit, which will happen on December 2nd and 3rd in San Francisco. We have 5 keynotes and 24 talks lined up, from 18 different companies. Check out the agenda here: http://spark-summit.org/agenda/. This will be the biggest Spark event yet, with some very cool use case talks, so we hope to see you there! Sign up now to still get access to the early-bird registration rate. Matei
Re: cluster hangs for no apparent reason
I am not sure. But in their RDD paper they have mentioned the usage of broadcast variable. Sometimes you may need local variable in many map-reduce jobs and you do not want to copy them to all worker nodes multiple times. Then the broadcast variable is a good choice 2013/11/7 Walrus theCat walrusthe...@gmail.com Shangyu, Thanks for the tip re: the flag! Maybe the broadcast variable is only for complex data structures? On Sun, Nov 3, 2013 at 7:58 PM, Shangyu Luo lsy...@gmail.com wrote: I met the problem of 'Too many open files' before. One solution is adding 'ulimit -n 10' in the spark-env.sh file. Basically, I think the local variable may not be a problem as I have written programs with local variables as parameters for functions and the programs work. 2013/11/3 Walrus theCat walrusthe...@gmail.com Hi Shangyu, I appreciate your ongoing correspondence. To clarify, my solution didn't work, and I didn't expect it to. I was digging through the logs, and I found a series of exceptions (in only one of the workers): 13/11/03 17:51:05 INFO client.DefaultHttpClient: Retrying connect 13/11/03 17:51:05 INFO http.AmazonHttpClient: Unable to execute HTTP request: Too many open files java.net.SocketException: Too many open files ... at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:808) ... I don't know why, because I do close those streams, but I'll look into it. As an aside, I make references to a spark.util.Vector from a parallelized context (in an RDD.map operation), as per the Logistic Regression example that Spark came with, and it seems to work out (the following from the examples, you'll see that 'w' is not a broadcast variable, and 'points' is an RDD): var w = Vector(D, _ = 2 * rand.nextDouble - 1) println(Initial w: + w) for (i - 1 to ITERATIONS) { println(On iteration + i) val gradient = points.map { p = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x }.reduce(_ + _) w -= gradient } On Sun, Nov 3, 2013 at 10:47 AM, Shangyu Luo lsy...@gmail.com wrote: Hi Walrus, Thank you for sharing your solution to your problem. I think I have met the similar problem before (i.e., one machine is working while others are idle.) and I just waits for a long time and the program will continue processing. I am not sure how your program filters an RDD by a locally stored set. If the set is a parameter of a function, I assume it should be copied to all worker nodes. But it is good that you solved your problem with a broadcast variable and the running time seems reasonable! 2013/11/3 Walrus theCat walrusthe...@gmail.com Hi Shangyu, Thanks for responding. This is a refactor of other code that isn't completely scalable because it pulls stuff to the driver. This code keeps everything on the cluster. I left it running for 7 hours, and the log just froze. I checked ganglia, and only one machine's CPU seemed to be doing anything. The last output on the log left my code at a spot where it is filtering an RDD by a locally stored set. No error was thrown. I thought that was OK based on the example code, but just in case, I changed it so it's a broadcast variable. The un-refactored code (that pulls all the data to the driver from time to time) runs in minutes. I've never had the problem before of the log just getting non-responsive, and was wondering if anyone knew of any heuristics I could check. Thank you On Sat, Nov 2, 2013 at 2:55 PM, Shangyu Luo lsy...@gmail.com wrote: Yes, I think so. The running time depends on what work your are doing and how large it is. 2013/11/1 Walrus theCat walrusthe...@gmail.com That's what I thought, too. So is it not hanging, just recalculating for a very long time? The log stops updating and it just gives the output I posted. If there are any suggestions as to parameters to change, or any other data, it would be appreciated. Thank you, Shangyu. On Fri, Nov 1, 2013 at 11:31 AM, Shangyu Luo lsy...@gmail.comwrote: I think the missing parent may be not abnormal. From my understanding, when a Spark task cannot find its parent, it can use some meta data to find the result of its parent or recalculate its parent's value. Imaging in a loop, a Spark task tries to find some value from the last iteration's result. 2013/11/1 Walrus theCat walrusthe...@gmail.com Are there heuristics to check when the scheduler says it is missing parents and just hangs? On Thu, Oct 31, 2013 at 4:56 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I'm not sure what's going on here. My code seems to be working thus far (map at SparkLR:90 completed.) What can I do to help the scheduler out here? Thanks 13/10/31 02:10:13 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(10, 211) 13/10/31 02:10:13 INFO scheduler.DAGScheduler: Stage 10 (map at SparkLR.scala:90) finished in 0.923 s
Re: Where is reduceByKey?
Thanks - I think this would be a helpful note to add to the docs. I went and read a few things about Scala implicit conversions (I'm obviously new to the language) and it seems like a very powerful language feature and now that I know about them it will certainly be easy to identify when they are missing (i.e. the first thing to suspect when you see a not a member compilation message.) I'm still a bit mystified as to how you would go about finding the appropriate imports except that I suppose you aren't very likely to use methods that you don't already know about! Unless you are copying code verbatim that doesn't have the necessary import statements On 11/7/2013 4:05 PM, Matei Zaharia wrote: Yeah, this is confusing and unfortunately as far as I know it’s API specific. Maybe we should add this to the documentation page for RDD. The reason for these conversions is to only allow some operations based on the underlying data type of the collection. For example, Scala collections support sum() as long as they contain numeric types. That’s fine for the Scala collection library since its conversions are imported by default, but I guess it makes it confusing for third-party apps. Matei On Nov 7, 2013, at 1:15 PM, Philip Ogren philip.og...@oracle.com mailto:philip.og...@oracle.com wrote: I remember running into something very similar when trying to perform a foreach on java.util.List and I fixed it by adding the following import: import scala.collection.JavaConversions._ And my foreach loop magically compiled - presumably due to a another implicit conversion. Now this is the second time I've run into this problem and I didn't recognize it. I'm not sure that I would know what to do the next time I run into this. Do you have some advice on how I should have recognized a missing import that provides implicit conversions and how I would know what to import? This strikes me as code obfuscation. I guess this is more of a Scala question Thanks, Philip On 11/7/2013 2:01 PM, Josh Rosen wrote: The additional methods on RDDs of pairs are defined in a class called PairRDDFunctions (https://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions). SparkContext provides an implicit conversion from RDD[T] to PairRDDFunctions[T] to make this transparent to users. To import those implicit conversions, use import org.apache.spark.SparkContext._ These conversions are automatically imported by Spark Shell, but you'll have to import them yourself in standalone programs. On Thu, Nov 7, 2013 at 11:54 AM, Philip Ogren philip.og...@oracle.com mailto:philip.og...@oracle.com wrote: On the front page http://spark.incubator.apache.org/ of the Spark website there is the following simple word count implementation: file = spark.textFile(hdfs://...) file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey(_ + _) The same code can be found in the Quick Start http://spark.incubator.apache.org/docs/latest/quick-start.html quide. When I follow the steps in my spark-shell (version 0.8.0) it works fine. The reduceByKey method is also shown in the list of transformations http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#transformations in the Spark Programming Guide. The bottom of this list directs the reader to the API docs for the class RDD (this link is broken, BTW). The API docs for RDD http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDD does not list a reduceByKey method for RDD. Also, when I try to compile the above code in a Scala class definition I get the following compile error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(java.lang.String, Int)] I am compiling with maven using the following dependency definition: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.9.3/artifactId version0.8.0-incubating/version /dependency Can someone help me understand why this code works fine from the spark-shell but doesn't seem to exist in the API docs and won't compile? Thanks, Philip
Re: Where is reduceByKey?
Yeah, it’s true that this feature doesn’t provide any way to give good error messages. Maybe some IDEs will support it eventually, though I haven’t seen it. Matei On Nov 7, 2013, at 3:46 PM, Philip Ogren philip.og...@oracle.com wrote: Thanks - I think this would be a helpful note to add to the docs. I went and read a few things about Scala implicit conversions (I'm obviously new to the language) and it seems like a very powerful language feature and now that I know about them it will certainly be easy to identify when they are missing (i.e. the first thing to suspect when you see a not a member compilation message.) I'm still a bit mystified as to how you would go about finding the appropriate imports except that I suppose you aren't very likely to use methods that you don't already know about! Unless you are copying code verbatim that doesn't have the necessary import statements On 11/7/2013 4:05 PM, Matei Zaharia wrote: Yeah, this is confusing and unfortunately as far as I know it’s API specific. Maybe we should add this to the documentation page for RDD. The reason for these conversions is to only allow some operations based on the underlying data type of the collection. For example, Scala collections support sum() as long as they contain numeric types. That’s fine for the Scala collection library since its conversions are imported by default, but I guess it makes it confusing for third-party apps. Matei On Nov 7, 2013, at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: I remember running into something very similar when trying to perform a foreach on java.util.List and I fixed it by adding the following import: import scala.collection.JavaConversions._ And my foreach loop magically compiled - presumably due to a another implicit conversion. Now this is the second time I've run into this problem and I didn't recognize it. I'm not sure that I would know what to do the next time I run into this. Do you have some advice on how I should have recognized a missing import that provides implicit conversions and how I would know what to import? This strikes me as code obfuscation. I guess this is more of a Scala question Thanks, Philip On 11/7/2013 2:01 PM, Josh Rosen wrote: The additional methods on RDDs of pairs are defined in a class called PairRDDFunctions (https://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions). SparkContext provides an implicit conversion from RDD[T] to PairRDDFunctions[T] to make this transparent to users. To import those implicit conversions, use import org.apache.spark.SparkContext._ These conversions are automatically imported by Spark Shell, but you'll have to import them yourself in standalone programs. On Thu, Nov 7, 2013 at 11:54 AM, Philip Ogren philip.og...@oracle.com wrote: On the front page of the Spark website there is the following simple word count implementation: file = spark.textFile(hdfs://...) file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey(_ + _) The same code can be found in the Quick Start quide. When I follow the steps in my spark-shell (version 0.8.0) it works fine. The reduceByKey method is also shown in the list of transformations in the Spark Programming Guide. The bottom of this list directs the reader to the API docs for the class RDD (this link is broken, BTW). The API docs for RDD does not list a reduceByKey method for RDD. Also, when I try to compile the above code in a Scala class definition I get the following compile error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(java.lang.String, Int)] I am compiling with maven using the following dependency definition: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.9.3/artifactId version0.8.0-incubating/version /dependency Can someone help me understand why this code works fine from the spark-shell but doesn't seem to exist in the API docs and won't compile? Thanks, Philip