Spark SQL: preferred syntax for column reference?
I'm just getting started with Spark SQL and DataFrames in 1.3.0. I notice that the Spark API shows a different syntax for referencing columns in a dataframe than the Spark SQL Programming Guide. For instance, the API docs for the select method show this: df.select($colA, $colB) Whereas the programming guide shows this: df.filter(df(name) 21).show() I tested and both the $column and df(column) syntax works, but I'm wondering which is *preferred*. Is one the original and one a new feature we should be using? Thanks, Diana (Spark Curriculum Developer for Cloudera)
Spark 1.0 docs out of sync?
I'm hoping someone can clear up some confusion for me. When I view the Spark 1.0 docs online (http://spark.apache.org/docs/1.0.0/) they are different than the docs which are packaged with the Spark 1.0.0 download (spark-1.0.0.tgz). In particular, in the online docs, there's a single merged Spark Programming Guide [image: Inline image 1] Whereas in the docs in the download package there are still three separate guides: [image: Inline image 2] Plus there are several other differences: the color scheme is different (orange vs. blue), and there are several content differences. (The first one being on the Overview page, e.g. Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH` vs. Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy to run locally on one machine --- all you need is to have `java` installed on your system `PATH` Can someone clarify? And more importantly, where can I download the *official* 1.0 docs to build locally? Thanks! Diana
Re: logging in pyspark
foreach vs. map isn't the issue. Both require serializing the called function, so the pickle error would still apply, yes? And at the moment, I'm just testing. Definitely wouldn't want to log something for each element, but may want to detect something and log for SOME elements. So my question is: how are other people doing logging from distributed tasks, given the serialization issues? The same issue actually exists in Scala, too. I could work around it by creating a small serializable object that provides a logger, but it seems kind of kludgy to me, so I'm wondering if other people are logging from tasks, and if so, how? Diana On Tue, May 6, 2014 at 6:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I think you're looking for RDD.foreach()http://spark.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#foreach . According to the programming guidehttp://spark.apache.org/docs/latest/scala-programming-guide.html : Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems. Do you really want to log something for each element of your RDD? Nick On Tue, May 6, 2014 at 3:31 PM, Diana Carroll dcarr...@cloudera.comwrote: What should I do if I want to log something as part of a task? This is what I tried. To set up a logger, I followed the advice here: http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off logger = logging.getLogger(py4j) logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) This works fine when I call it from my driver (ie pyspark): logger.info(this works fine) But I want to try logging within a distributed task so I did this: def logTestMap(a): logger.info(test) return a myrdd.map(logTestMap).count() and got: PicklingError: Can't pickle 'lock' object So it's trying to serialize my function and can't because of a lock object used in logger, presumably for thread-safeness. But then...how would I do it? Or is this just a really bad idea? Thanks Diana
NotSerializableException in Spark Streaming
Hey all, trying to set up a pretty simple streaming app and getting some weird behavior. First, a non-streaming job that works fine: I'm trying to pull out lines of a log file that match a regex, for which I've set up a function: def getRequestDoc(s: String): String = { KBDOC-[0-9]*.r.findFirstIn(s).orNull } logs=sc.textFile(logfiles) logs.map(getRequestDoc).take(10) That works, but I want to run that on the same data, but streaming, so I tried this: val logs = ssc.socketTextStream(localhost,) logs.map(getRequestDoc).print() ssc.start() From this code, I get: 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job 1399545128000 ms.0 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext But if I do the map function inline instead of calling a separate function, it works: logs.map(KBDOC-[0-9]*.r.findFirstIn(_).orNull).print() So why is it able to serialize my little function in regular spark, but not in streaming? Thanks, Diana
logging in pyspark
What should I do if I want to log something as part of a task? This is what I tried. To set up a logger, I followed the advice here: http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off logger = logging.getLogger(py4j) logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) This works fine when I call it from my driver (ie pyspark): logger.info(this works fine) But I want to try logging within a distributed task so I did this: def logTestMap(a): logger.info(test) return a myrdd.map(logTestMap).count() and got: PicklingError: Can't pickle 'lock' object So it's trying to serialize my function and can't because of a lock object used in logger, presumably for thread-safeness. But then...how would I do it? Or is this just a really bad idea? Thanks Diana
Re: performance improvement on second operation...without caching?
to be updated? On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.comwrote: I'm just Posty McPostalot this week, sorry folks! :-) Anyway, another question today: I have a bit of code that is pretty time consuming (pasted at the end of the message): It reads in a bunch of XML files, parses them, extracts some data in a map, counts (using reduce), and then sorts. All stages are executed when I do a final operation (take). The first stage is the most expensive: on first run it takes 30s to a minute. I'm not caching anything. When I re-execute that take at the end, I expected it to re-execute all the same stages, and take approximately the same amount of time, but it didn't. The second take executes only a single stage which collectively run very fast: the whole operation takes less than 1 second (down from 5 minutes!) While this is awesome (!) I don't understand it. If I'm not caching data, why would I see such a marked performance improvement on subsequent execution? (or is this related to the known .9.1 bug about sortByKey executing an action when it shouldn't?) Thanks, Diana sparkdev_04-23_KEEP_FOR_BUILDS.png # load XML files containing device activation records. # Find the most common device models activated import xml.etree.ElementTree as ElementTree # Given a partition containing multi-line XML, parse the contents. # Return an iterator of activation Elements contained in the partition def getactivations(fileiterator): s = '' for i in fileiterator: s = s + str(i) filetree = ElementTree.fromstring(s) return filetree.getiterator('activation') # Get the model name from a device activation record def getmodel(activation): return activation.find('model').text filename=hdfs://localhost/user/training/activations/*.xml # parse each partition as a file into an activation XML record activations = sc.textFile(filename) activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) models = activationTrees.map(lambda activation: getmodel(activation)) # count and sort activations by model topmodels = models.map(lambda model: (model,1))\ .reduceByKey(lambda v1,v2: v1+v2)\ .map(lambda (model,count): (count,model))\ .sortByKey(ascending=False) # display the top 10 models for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count) # repeat! for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count)
when to use broadcast variables
Anyone have any guidance on using a broadcast variable to ship data to workers vs. an RDD? Like, say I'm joining web logs in an RDD with user account data. I could keep the account data in an RDD or if it's small, a broadcast variable instead. How small is small? Small enough that I know it can easily fit in memory on a single node? Some other guideline? Thanks! Diana
Re: the spark configuage
I'm guessing your shell stopping when it attempts to connect to the RM is not related to that warning. You'll get that message out of the box from Spark if you don't have HADOOP_HOME set correctly. I'm using CDH 5.0 installed in default locations, and got rid of the warning by setting HADOOP_HOME to /usr/lib/hadoop. The stopping issue might be something unrelated. Diana On Wed, Apr 30, 2014 at 3:58 AM, Sophia sln-1...@163.com wrote: Hi, when I configue spark, run the shell instruction: ./spark-shellit told me like this: WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your builtin-java classes where applicable,when it connect to ResourceManager,it stopped. What should I DO? Wish your reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
running SparkALS
Hi everyone. I'm trying to run some of the Spark example code, and most of it appears to be undocumented (unless I'm missing something). Can someone help me out? I'm particularly interested in running SparkALS, which wants parameters: M U F iter slices What are these variables? They appear to be integers and the default values are 100, 500 and 10 respectively but beyond that...huh? Thanks! Diana
Re: running SparkALS
Thanks, Deb. But I'm looking at org.apache.spark.examples.SparkALS, which is not in the mllib examples, and does not take any file parameters. I don't see the class you refer to in the examples ...however, if I did want to run that example, where would I find the file in question? It would be great if this were documented, perhaps in the source code. I'll add a JIRA. Thanks, Diana On Mon, Apr 28, 2014 at 1:41 PM, Debasish Das debasish.da...@gmail.comwrote: Diana, Here are the parameters: ./bin/spark-class org.apache.spark.mllib.recommendation.ALS Usage: ALS master ratings_file rank iterations output_dir [lambda] [implicitPrefs] [alpha] [blocks] Master: Local/Deployed spark cluster master ratings_file: Netflix format data rank: Reduced dimension of the User and Product factors iterations: How many ALS iterations you would like to run output_dir: Where to generate the usera and product factors lambda: regularization for nuclear norm implicitPrefs: true will run Koren's netflix prize paper's implicit algorithm alpha: is valid for implicitPrefs blocks: How many blocks you want to partition your rating, user and product factor matrix I have run with 64 GB JVM with 20M users, 1.6M ratings and 50 factorsyou should be able to go even beyond that if you want to increase the JVM size... The scalability issue comes from the fact that each JVM has to collect either user/product factors before doing a BLAS posv solveseems like that's the bottleneck step...but making double to float is one way to scale it even further... Thanks. Deb On Mon, Apr 28, 2014 at 10:30 AM, Diana Carroll dcarr...@cloudera.comwrote: Hi everyone. I'm trying to run some of the Spark example code, and most of it appears to be undocumented (unless I'm missing something). Can someone help me out? I'm particularly interested in running SparkALS, which wants parameters: M U F iter slices What are these variables? They appear to be integers and the default values are 100, 500 and 10 respectively but beyond that...huh? Thanks! Diana
Re: running SparkALS
Should I file a JIRA to remove the example? I think it is confusing to include example code without explanation of how to run it, and it sounds like this one isn't worth running or reviewing anyway. On Mon, Apr 28, 2014 at 2:34 PM, Debasish Das debasish.da...@gmail.comwrote: Don't use SparkALS...that's the first version of the code and does not scale... Li is right...you have to do the dictionary generation on users, products and then generate indexed fileI wrote some utilities but looks like it is application dependentthe indexed netflix format is more generic... On Mon, Apr 28, 2014 at 10:47 AM, Diana Carroll dcarr...@cloudera.comwrote: Thanks, Deb. But I'm looking at org.apache.spark.examples.SparkALS, which is not in the mllib examples, and does not take any file parameters. I don't see the class you refer to in the examples ...however, if I did want to run that example, where would I find the file in question? It would be great if this were documented, perhaps in the source code. I'll add a JIRA. Thanks, Diana On Mon, Apr 28, 2014 at 1:41 PM, Debasish Das debasish.da...@gmail.comwrote: Diana, Here are the parameters: ./bin/spark-class org.apache.spark.mllib.recommendation.ALS Usage: ALS master ratings_file rank iterations output_dir [lambda] [implicitPrefs] [alpha] [blocks] Master: Local/Deployed spark cluster master ratings_file: Netflix format data rank: Reduced dimension of the User and Product factors iterations: How many ALS iterations you would like to run output_dir: Where to generate the usera and product factors lambda: regularization for nuclear norm implicitPrefs: true will run Koren's netflix prize paper's implicit algorithm alpha: is valid for implicitPrefs blocks: How many blocks you want to partition your rating, user and product factor matrix I have run with 64 GB JVM with 20M users, 1.6M ratings and 50 factorsyou should be able to go even beyond that if you want to increase the JVM size... The scalability issue comes from the fact that each JVM has to collect either user/product factors before doing a BLAS posv solveseems like that's the bottleneck step...but making double to float is one way to scale it even further... Thanks. Deb On Mon, Apr 28, 2014 at 10:30 AM, Diana Carroll dcarr...@cloudera.comwrote: Hi everyone. I'm trying to run some of the Spark example code, and most of it appears to be undocumented (unless I'm missing something). Can someone help me out? I'm particularly interested in running SparkALS, which wants parameters: M U F iter slices What are these variables? They appear to be integers and the default values are 100, 500 and 10 respectively but beyond that...huh? Thanks! Diana
checkpointing without streaming?
I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
Re: checkpointing without streaming?
When might that be necessary or useful? Presumably I can persist and replicate my RDD to avoid re-computation, if that's my goal. What advantage does checkpointing provide over disk persistence with replication? On Mon, Apr 21, 2014 at 2:42 PM, Xiangrui Meng men...@gmail.com wrote: Checkpoint clears dependencies. You might need checkpoint to cut a long lineage in iterative algorithms. -Xiangrui On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
partitioning of small data sets
I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb Given the size, and that it is a single file, I assumed it would only be in a single partition. But when I cache it, I can see in the Spark App UI that it actually splits it into two partitions: [image: Inline image 1] Is this correct behavior? How does Spark decide how big a partition should be, or how many partitions to create for an RDD. If it matters, I have only a single worker in my cluster, so both partitions are stored on the same worker. The file was on HDFS and was only a single block. Thanks for any insight. Diana inline: sparkdev_2014-04-11.png
using Kryo with pyspark?
I'm looking at the Tuning Guide suggestion to use Kryo instead of default serialization. My questions: Does pyspark use Java serialization by default, as Scala spark does? If so, then... can I use Kryo with pyspark instead? The instructions say I should register my classes with the Kryo Serialization, but that's in Java/Scala. If I simply set the spark.serializer variable for my SparkContext, will it at least use Kryo for Spark's own classes, even if I can't register any of my own classes? Thanks, Diana
Re: network wordcount example
Not sure what data you are sending in. You could try calling lines.print() instead which should just output everything that comes in on the stream. Just to test that your socket is receiving what you think you are sending. On Mon, Mar 31, 2014 at 12:18 PM, eric perler ericper...@hotmail.comwrote: Hello i just started working with spark today... and i am trying to run the wordcount network example i created a socket server and client.. and i am sending data to the server in an infinite loop when i run the spark class.. i see this output in the console... --- Time: 1396281891000 ms --- 14/03/31 11:04:51 INFO SparkContext: Job finished: take at DStream.scala:586, took 0.056794606 s 14/03/31 11:04:51 INFO JobScheduler: Finished job streaming job 1396281891000 ms.0 from job set of time 1396281891000 ms 14/03/31 11:04:51 INFO JobScheduler: Total delay: 0.101 s for time 1396281891000 ms (execution: 0.058 s) 14/03/31 11:04:51 INFO TaskSchedulerImpl: Remove TaskSet 3.0 from pool but i dont see any output from the workcount operation when i make this call... wordCounts.print(); any help is greatly appreciated thanks in advance
streaming: code to simulate a network socket data source
If you are learning about Spark Streaming, as I am, you've probably use netcat nc as mentioned in the spark streaming programming guide. I wanted something a little more useful, so I modified the ClickStreamGenerator code to make a very simple script that simply reads a file off disk and passes it to a socket, character by character. You specify the port, filename, and bytesPerSecond that you want it to send. Thought someone else might find this helpful, so here it is. import java.net.ServerSocket import java.io.PrintWriter import scala.io.Source object StreamingDataGenerator { def main(args : Array[String]) { if (args.length != 3) { System.err.println(Usage: StreamingDataGenerator port file bytesPerSecond) System.exit(1) } val port = args(0).toInt val file = Source.fromFile(args(1)) val bytesPerSecond = args(2).toFloat val sleepDelayMs = (1000.0 / bytesPerSecond).toInt val listener = new ServerSocket(port) println(Reading from file: + file.descr) while (true) { println(Listening on port: + port) val socket = listener.accept() new Thread() { override def run = { println(Got client connect from: + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) file.foreach(c = { Thread.sleep(sleepDelayMs) // write the byte to the socket out.write(c) out.flush() // also print the byte to stdout, for debugging ease print(c) } ) socket.close() } }.start() } } }
spark streaming: what is awaitTermination()?
The API docs for ssc.awaitTermination say simply Wait for the execution to stop. Any exceptions that occurs during the execution will be thrown in this thread. Can someone help me understand what this means? What causes execution to stop? Why do we need to wait for that to happen? I tried removing it from my simple NetworkWordCount example (running locally, not on a cluster) and nothing changed. In both cases, I end my program by hitting Ctrl-C. Thanks for any insight you can give me. Diana
spark streaming and the spark shell
I'm working with spark streaming using spark-shell, and hoping folks could answer a few questions I have. I'm doing WordCount on a socket stream: import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.Seconds var ssc = new StreamingContext(sc,Seconds(5)) var mystream = ssc.socketTextStream(localhost,) var words = mystream.flatMap(line = line.split( )) var wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y) wordCounts.print() ssc.start() 1. I'm assuming that using spark shell is an edge case, and that spark streaming is really intended mostly for batch use. True? 2. I notice that once I start ssc.start(), my stream starts processing and continues indefinitely...even if I close the socket on the server end (I'm using unix command nc to mimic a server as explained in the streaming programming guide .) Can I tell my stream to detect if it's lost a connection and therefore stop executing? (Or even better, to attempt to re-establish the connection?) 3. I tried entering ssc.stop which resulted in an error: Exception in thread Thread-43 org.apache.spark.SparkException: Job cancelled because SparkContext was shut down 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding SendingConnectionManagerId not found But it did stop the DStream execution. 4. Then I tried restarting the ssc again (ssc.start) and got another error: org.apache.spark.SparkException: JobScheduler already started Is restarting an ssc supported? 5. When I perform an operation like wordCounts.print(), that operation will execution on each batch, ever n seconds. Is there a way I can undo that operation? That is, I want it to *stop* executing that print ever n seconds...without having to stop the stream. What I'm really asking is...can I explore DStreams interactively the way I can explore my data in regular Spark. In regular Spark, I might perform various operations on an RDD to see what happens. So at first, I might have used split( ) to tokenize my input text, but now I want to try using split(,) instead, after the stream has already started running. Can I do that? I did find out that if add a new operation to an existing dstream (say, words.print()) *after *the ssc.start it works. It *will* add the second print() call to the execution list every n seconds. but if I try to add new dstreams, e.g. ... ssc.start() var testpairs = words.map(x = (x, TEST)) testpairs.print() I get an error: 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time 139593227 ms java.lang.Exception: org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been initialized Is this sort of interactive use just not supported? Thanks! Diana
streaming questions
I'm trying to understand Spark streaming, hoping someone can help. I've kinda-sorta got a version of Word Count running, and it looks like this: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingWordCount { def main(args: Array[String]) { if (args.length 3) { System.err.println(Usage: StreamingWordCount master hostname port) System.exit(1) } val master = args(0) val hostname = args(1) val port = args(2).toInt val ssc = new StreamingContext(master, Streaming Word Count,Seconds(2)) val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(line = line.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y) wordCounts.print() ssc.start() ssc.awaitTermination() } } (I also have a small script that sends text to that port.) *Question 1:* When I run this, I don't get any output from the wordCounts.print as long as my data is still streaming. I have to stop my streaming data script before my program will display the word counts. Why is that? What if my stream is indefinite? I thought the point of Streaming was that it would process it in real time? *Question 2:* While I run this (and the stream is still sending) I get continuous warning messages like this: 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists on this machine; not re-adding it 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists on this machine; not re-adding it What does that mean? *Question 3:* I tried replacing the wordCounts.print() line with wordCounts.saveAsTextFiles(file:/my/path/outdir). This results in the creation of a new outdir-timestamp file being created every two seconds...even if there's no data during that time period. Is there a way to tell it to save only if there's data? Thanks!
Re: streaming questions
Thanks, Tagatha, very helpful. A follow-up question below... On Wed, Mar 26, 2014 at 2:27 PM, Tathagata Das tathagata.das1...@gmail.comwrote: *Answer 3:*You can do something like wordCounts.foreachRDD((rdd: RDD[...], time: Time) = { if (rdd.take(1).size == 1) { // There exists at least one element in RDD, so save it to file rdd.saveAsTextFile(generate file name based on time) } } Is calling foreachRDD and performing an operation on each individually as efficient as performing the operation on the dstream? Is this foreach pretty much what dstream.saveAsTextFiles is doing anyway? This also brings up a question I have about caching in the context of streaming. In this example, would I want to call rdd.cache()? I'm calling two successive operations on the same rdd (take(1) and then saveAsTextFile))...if I were doing this in regular Spark I'd want to cache so I wouldn't need to re-calculate the rdd for both calls. Does the same apply here? Thanks, Diana
quick start guide: building a standalone scala program
Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Yana: Thanks. Can you give me a transcript of the actual commands you are running? THanks! Diana On Mon, Mar 24, 2014 at 3:59 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: I am able to run standalone apps. I think you are making one mistake that throws you off from there onwards. You don't need to put your app under SPARK_HOME. I would create it in its own folder somewhere, it follows the rules of any standalone scala program (including the layout). In the giude, $SPARK_HOME is only relevant to find the Readme file which they are parsing/word-counting. But otherwise the compile time dependencies on spark would be resolved via the sbt file (or the pom file if you look at the Java example). So for example I put my app under C:\Source\spark-code and the jar gets created in C:\Source\spark-code\target\scala-2.9.3 (or 2.10 if you're running with scala 2.10 as the example shows). But for that part of the guide, it's not any different than building a scala app. On Mon, Mar 24, 2014 at 3:44 PM, Diana Carroll dcarr...@cloudera.com wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Thanks, Nan Zhu. You say that my problems are because you are in Spark directory, don't need to do that actually , the dependency on Spark is resolved by sbt I did try it initially in what I thought was a much more typical place, e.g. ~/mywork/sparktest1. But as I said in my email: (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:00 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Diana, See my inlined answer -- Nan Zhu On Monday, March 24, 2014 at 3:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) You can create your application in any directory, just follow the sbt project dir structure 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. should be correct 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions should be correct 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. because you are in Spark directory, don't need to do that actually , the dependency on Spark is resolved by sbt (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put that as your first line in each of these scala files). I am new to Java/Scala so this is how I do it. More educated Java/Scala programmers may tell you otherwise ;) You can get more complicated with the sbt project subrirectory but you can read independently about sbt and what it can do, above is the bare minimum. Let me know if that helped. Ognen On 3/24/14, 2:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a- standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put that as your first line in each of these scala files). I am new to Java/Scala so this is how I do it. More educated Java/Scala programmers may tell you otherwise ;) You can get more complicated with the sbt project subrirectory but you can read independently about sbt and what it can do, above is the bare minimum. Let me know if that helped. Ognen On 3/24/14, 2:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line
Re: quick start guide: building a standalone scala program
Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working *as written on the Spark website*. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put
Re: Writing RDDs to HDFS
Ongen: I don't know why your process is hanging, sorry. But I do know that the way saveAsTextFile works is that you give it a path to a directory, not a file. The file is saved in multiple parts, corresponding to the partitions. (part-0, part-1 etc.) (Presumably it does this because it allows each partition to be saved on the local disk, to minimize network traffic. It's how Hadoop works, too.) On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski og...@nengoiksvelzud.comwrote: Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt) supposed to work? Meaning, can I save files to the HDFS fs this way? I tried: val r = sc.parallelize(List(1,2,3,4,5,6,7,8)) r.saveAsTextFile(hdfs://ip:port/path/file.txt) and it is just hanging. At the same time on my HDFS it created file.txt but as a directory which has subdirectories (the final one is empty). Thanks! Ognen
Re: quick start guide: building a standalone scala program
It is suggested implicitly in giving you the command ./sbt/sbt. The separately installed sbt isn't in a folder called sbt, whereas Spark's version is. And more relevantly, just a few paragraphs earlier in the tutorial you execute the command sbt/sbt assembly which definitely refers to the spark install. On Monday, March 24, 2014, Nan Zhu zhunanmcg...@gmail.com wrote: I found that I never read the document carefully and I never find that Spark document is suggesting you to use Spark-distributed sbt.. Best, -- Nan Zhu On Monday, March 24, 2014 at 5:47 PM, Diana Carroll wrote: Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working *as written on the Spark website*. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I sho
Re: example of non-line oriented input data?
If I don't call iter(), and just return treeiterator directly, I get an error message that the object is not of an iterator type. This is in Python 2.6...perhaps a bug? BUT I also realized my code was wrong. It results in an RDD containing all the tags in all the files. What I really want is an RDD where each record corresponds to a single file. So if I have a thousand files, I should have a thousand elements in my RDD, each of which is an ElementTree. (Which I can then use to map or flatMap to pull out the data I actually care about.) So, this works: def parsefile(iterator): s = '' for i in iterator: s = s + str(i) yield ElementTree.fromstring(s) I would think the ability to process very large numbers of smallish XML files is pretty common. The use case I'm playing with right now is using a knowledge base of HTML documents. Each document in the KB is a single file, which in my experience is not an unusual configuration. I'd like to be able to suck the whole KB into an RDD and then do analysis such as which keywords are most commonly used in the KB or is there a correlation between certain user attributes and the KB articles they request and so on. Unfortunately I'm not sure I'm best to answer your question about non-text InputFormats to support. I'm fairly new to Hadoop (about 8 months) and I'm not in the field. My background is in app servers, ecommerce and business process management, so that's my bias. From that perspective, it would be really useful to be able to work with XML/HTML and CSV files...but are those what big data analysts are actually using Spark for? I dunno. And, really, if I were actually in those fields, I'd be getting the data from a DB using Shark, right? Diana On Tue, Mar 18, 2014 at 6:27 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Hi Diana, This seems to work without the iter() in front if you just return treeiterator. What happened when you didn't include that? Treeiterator should return an iterator. Anyway, this is a good example of mapPartitions. It's one where you want to view the whole file as one object (one XML here), so you couldn't implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We'd like to relax this later but we're using some newer features of NumPy and Python. The rest of PySpark works on 2.6. In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can't work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis one example. Matei On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote: Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly. I have to call iter(). Why?) import xml.etree.ElementTree as ET # two source files, format data country name=../country.../data mydata=sc.textFile(file:/home/training/countries*.xml) def parsefile(iterator): s = '' for i in iterator: s = s + str(i) tree = ET.fromstring(s) treeiterator = tree.getiterator(country) # why to I have to convert an iterator to an iterator? not sure but required return iter(treeiterator) mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect() The output is what I expect: [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] BUT I'm a bit concerned about the construction of the string s. How big can my file be before converting it to a string becomes problematic? On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.comwrote: Thanks, Matei. In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line. The theory makes sense but I'm still utterly lost as to how to implement it. Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?) I'd really really love to see a real life example of a Python use of mapPartitions. I do appreciate the very simple examples
Re: example of non-line oriented input data?
Actually, thinking more on this question, Matei: I'd definitely say support for Avro. There's a lot of interest in this!! On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia matei.zaha...@gmail.comwrote: BTW one other thing -- in your experience, Diana, which non-text InputFormats would be most useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or something else? I think a per-file text input format that does the stuff we did here would also be good. Matei On Mar 18, 2014, at 3:27 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, This seems to work without the iter() in front if you just return treeiterator. What happened when you didn't include that? Treeiterator should return an iterator. Anyway, this is a good example of mapPartitions. It's one where you want to view the whole file as one object (one XML here), so you couldn't implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We'd like to relax this later but we're using some newer features of NumPy and Python. The rest of PySpark works on 2.6. In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can't work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis one example. Matei On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote: Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly. I have to call iter(). Why?) import xml.etree.ElementTree as ET # two source files, format data country name=../country.../data mydata=sc.textFile(file:/home/training/countries*.xml) def parsefile(iterator): s = '' for i in iterator: s = s + str(i) tree = ET.fromstring(s) treeiterator = tree.getiterator(country) # why to I have to convert an iterator to an iterator? not sure but required return iter(treeiterator) mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect() The output is what I expect: [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] BUT I'm a bit concerned about the construction of the string s. How big can my file be before converting it to a string becomes problematic? On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.comwrote: Thanks, Matei. In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line. The theory makes sense but I'm still utterly lost as to how to implement it. Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?) I'd really really love to see a real life example of a Python use of mapPartitions. I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)]. Also, you say that the function called in mapPartitions can return a collection OR an iterator. I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous. Diana On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum). For example, if you have data = sc.parallelize([1, 2, 3, 4], 2
Re: example of non-line oriented input data?
Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly. I have to call iter(). Why?) import xml.etree.ElementTree as ET # two source files, format data country name=../country.../data mydata=sc.textFile(file:/home/training/countries*.xml) def parsefile(iterator): s = '' for i in iterator: s = s + str(i) tree = ET.fromstring(s) treeiterator = tree.getiterator(country) # why to I have to convert an iterator to an iterator? not sure but required return iter(treeiterator) mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect() The output is what I expect: [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] BUT I'm a bit concerned about the construction of the string s. How big can my file be before converting it to a string becomes problematic? On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.comwrote: Thanks, Matei. In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line. The theory makes sense but I'm still utterly lost as to how to implement it. Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?) I'd really really love to see a real life example of a Python use of mapPartitions. I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)]. Also, you say that the function called in mapPartitions can return a collection OR an iterator. I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous. Diana On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum). For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work: data.mapPartitions(lambda x: x).collect() [1, 2, 3, 4] # Just return the same iterator, doing nothing data.mapPartitions(lambda x: [list(x)]).collect() [[1, 2], [3, 4]] # Group together the elements of each partition in a single list (like glom) data.mapPartitions(lambda x: [sum(x)]).collect() [3, 7] # Sum each partition separately However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That's why I put sum(x) inside a list above. In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects. Matei On Mar 17, 2014, at 11:25 AM, Diana Carroll dcarr...@cloudera.com wrote: There's also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that. I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions. No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators. All I get so far in my attempts to use mapPartitions is the darned suchnsuch is not an iterator error. def myfunction(iterator): return [1,2,3] mydata.mapPartitions(lambda x: myfunction(x)).take(2) On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Here's an example of getting together all lines in a file as one string: $ cat dir/a.txt Hello world! $ cat dir/b.txt What's up?? $ bin/pyspark files = sc.textFile(dir) files.collect() [u'Hello', u'world!', uWhat's
Re: example of non-line oriented input data?
There's also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that. I confess, I was hoping for an example of just that, because i've not yet been able to figure out how to use mapPartitions. No doubt this is because i'm a rank newcomer to Python, and haven't fully wrapped my head around iterators. All I get so far in my attempts to use mapPartitions is the darned suchnsuch is not an iterator error. def myfunction(iterator): return [1,2,3] mydata.mapPartitions(lambda x: myfunction(x)).take(2) On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Here's an example of getting together all lines in a file as one string: $ cat dir/a.txt Hello world! $ cat dir/b.txt What's up?? $ bin/pyspark files = sc.textFile(dir) files.collect() [u'Hello', u'world!', uWhat's, u'up??'] # one element per line, not what we want files.glom().collect() [[u'Hello', u'world!'], [uWhat's, u'up??']] # one element per file, which is an array of lines files.glom().map(lambda a: \n.join(a)).collect() [u'Hello\nworld!', uWhat's\nup??]# join back each file into a single string The glom() method groups all the elements of each partition of an RDD into an array, giving you an RDD of arrays of objects. If your input is small files, you always have one partition per file. There's also mapPartitions, which gives you an iterator for each partition instead of an array. You can then return an iterator or list of objects to produce from that. Matei On Mar 17, 2014, at 10:46 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks Matei. That makes sense. I have here a dataset of many many smallish XML files, so using mapPartitions that way would make sense. I'd love to see a code example though ...It's not as obvious to me how to do that as I probably should be. Thanks, Diana On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, Non-text input formats are only supported in Java and Scala right now, where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives you one record per line. For JSON, you'd have to fit the whole JSON object on one line as you said. Hopefully we'll also have some other forms of input soon. If your input is a collection of separate files (say many .xml files), you can also use mapPartitions on it to group together the lines because each input file will end up being a single dataset partition (or map task). This will let you concatenate the lines in each file and parse them as one XML object. Matei On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks, Krakna, very helpful. The way I read the code, it looks like you are assuming that each line in foo.log contains a complete json object? (That is, that the data doesn't contain any records that are split into multiple lines.) If so, is that because you know that to be true of your data? Or did you do as Nicholas suggests and have some preprocessing on the text input to flatten the data in that way? Thanks, Diana On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com wrote: Katrina, Not sure if this is what you had in mind, but here's some simple pyspark code that I recently wrote to deal with JSON files. from pyspark import SparkContext, SparkConf from operator import add import json import random import numpy as np def concatenate_paragraphs(sentence_array): return ' '.join(sentence_array).split(' ') logFile = 'foo.json' conf = SparkConf() conf.setMaster(spark://cluster-master:7077).setAppName(example).set(spark.executor.memory, 1g) sc = SparkContext(conf=conf) logData = sc.textFile(logFile).cache() num_lines = logData.count() print 'Number of lines: %d' % num_lines # JSON object has the structure: {key: {'paragraphs': [sentence1, sentence2, ...]}} tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs'] tm = tm.reduceByKey(lambda _, x: _ + x) op = tm.collect() for key, num_words in op: print 'state: %s, num_words: %d' % (state, num_words) On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User List] [hidden email] wrote: I don't actually have any data. I'm writing a course that teaches students how to do this sort of thing and am interested in looking at a variety of real life examples of people doing things like that. I'd love to see some working code implementing the obvious work-around you mention...do you have any to share? It's an approach that makes a lot