Spark SQL: preferred syntax for column reference?

2015-05-13 Thread Diana Carroll
I'm just getting started with Spark SQL and DataFrames in 1.3.0.

I notice that the Spark API shows a different syntax for referencing
columns in a dataframe than the Spark SQL Programming Guide.

For instance, the API docs for the select method show this:
df.select($colA, $colB)


Whereas the programming guide shows this:
df.filter(df(name)  21).show()

I tested and both the $column and df(column) syntax works, but I'm
wondering which is *preferred*.  Is one the original and one a new feature
we should be using?

Thanks,
Diana
(Spark Curriculum Developer for Cloudera)


Spark 1.0 docs out of sync?

2014-06-30 Thread Diana Carroll
I'm hoping someone can clear up some confusion for me.

When I view the Spark 1.0 docs online (http://spark.apache.org/docs/1.0.0/)
they are different than the docs which are packaged with the Spark 1.0.0
download (spark-1.0.0.tgz).

In particular, in the online docs, there's a single merged Spark
Programming Guide
[image: Inline image 1]
Whereas in the docs in the download package there are still three separate
guides:
[image: Inline image 2]

Plus there are several other differences: the color scheme is different
(orange vs. blue), and there are several content differences.  (The first
one being on the Overview page, e.g.
Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All
you need to run it is to have `java` to installed on your system `PATH`
vs.
Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS).
It's easy to run locally on one machine --- all you need is to have `java`
installed on your system `PATH`


Can someone clarify?  And more importantly, where can I download the
*official* 1.0 docs to build locally?

Thanks!
Diana


Re: logging in pyspark

2014-05-14 Thread Diana Carroll
foreach vs. map isn't the issue.  Both require serializing the called
function, so the pickle error would still apply, yes?

And at the moment, I'm just testing.  Definitely wouldn't want to log
something for each element, but may want to detect something and log for
SOME elements.

So my question is: how are other people doing logging from distributed
tasks, given the serialization issues?

The same issue actually exists in Scala, too.  I could work around it by
creating a small serializable object that provides a logger, but it seems
kind of kludgy to me, so I'm wondering if other people are logging from
tasks, and if so, how?

Diana


On Tue, May 6, 2014 at 6:24 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 I think you're looking for 
 RDD.foreach()http://spark.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#foreach
 .

 According to the programming 
 guidehttp://spark.apache.org/docs/latest/scala-programming-guide.html
 :

 Run a function func on each element of the dataset. This is usually done
 for side effects such as updating an accumulator variable (see below) or
 interacting with external storage systems.


 Do you really want to log something for each element of your RDD?

 Nick


 On Tue, May 6, 2014 at 3:31 PM, Diana Carroll dcarr...@cloudera.comwrote:

 What should I do if I want to log something as part of a task?

 This is what I tried.  To set up a logger, I followed the advice here:
 http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off

 logger = logging.getLogger(py4j)
 logger.setLevel(logging.INFO)
 logger.addHandler(logging.StreamHandler())

 This works fine when I call it from my driver (ie pyspark):
 logger.info(this works fine)

 But I want to try logging within a distributed task so I did this:

 def logTestMap(a):
  logger.info(test)
 return a

 myrdd.map(logTestMap).count()

 and got:
 PicklingError: Can't pickle 'lock' object

 So it's trying to serialize my function and can't because of a lock
 object used in logger, presumably for thread-safeness.  But then...how
 would I do it?  Or is this just a really bad idea?

 Thanks
 Diana





NotSerializableException in Spark Streaming

2014-05-14 Thread Diana Carroll
Hey all, trying to set up a pretty simple streaming app and getting some
weird behavior.

First, a non-streaming job that works fine:  I'm trying to pull out lines
of a log file that match a regex, for which I've set up a function:

def getRequestDoc(s: String):
String = { KBDOC-[0-9]*.r.findFirstIn(s).orNull }
logs=sc.textFile(logfiles)
logs.map(getRequestDoc).take(10)

That works, but I want to run that on the same data, but streaming, so I
tried this:

val logs = ssc.socketTextStream(localhost,)
logs.map(getRequestDoc).print()
ssc.start()

From this code, I get:
14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
1399545128000 ms.0
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext


But if I do the map function inline instead of calling a separate function,
it works:

logs.map(KBDOC-[0-9]*.r.findFirstIn(_).orNull).print()

So why is it able to serialize my little function in regular spark, but not
in streaming?

Thanks,
Diana


logging in pyspark

2014-05-06 Thread Diana Carroll
What should I do if I want to log something as part of a task?

This is what I tried.  To set up a logger, I followed the advice here:
http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off

logger = logging.getLogger(py4j)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

This works fine when I call it from my driver (ie pyspark):
logger.info(this works fine)

But I want to try logging within a distributed task so I did this:

def logTestMap(a):
logger.info(test)
return a

myrdd.map(logTestMap).count()

and got:
PicklingError: Can't pickle 'lock' object

So it's trying to serialize my function and can't because of a lock object
used in logger, presumably for thread-safeness.  But then...how would I do
it?  Or is this just a really bad idea?

Thanks
Diana


Re: performance improvement on second operation...without caching?

2014-05-05 Thread Diana Carroll
 to be updated?


 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.comwrote:

 I'm just Posty McPostalot this week, sorry folks! :-)

 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end
 of the message):
 It reads in a bunch of XML files, parses them, extracts some data in a
 map, counts (using reduce), and then sorts.   All stages are executed when
 I do a final operation (take).  The first stage is the most expensive: on
 first run it takes 30s to a minute.

 I'm not caching anything.

 When I re-execute that take at the end, I expected it to re-execute
 all the same stages, and take approximately the same amount of time, but 
 it
 didn't.  The second take executes only a single stage which collectively
 run very fast: the whole operation takes less than 1 second (down from 5
 minutes!)

 While this is awesome (!) I don't understand it.  If I'm not caching
 data, why would I see such a marked performance improvement on subsequent
 execution?

 (or is this related to the known .9.1 bug about sortByKey executing an
 action when it shouldn't?)

 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png

 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree

 # Given a partition containing multi-line XML, parse the contents.
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')

 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text

 filename=hdfs://localhost/user/training/activations/*.xml

 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml:
 getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))

 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\
 .map(lambda (model,count): (count,model))\
 .sortByKey(ascending=False)

 # display the top 10 models
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)

  # repeat!
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)










when to use broadcast variables

2014-05-02 Thread Diana Carroll
Anyone have any guidance on using a broadcast variable to ship data to
workers vs. an RDD?

Like, say I'm joining web logs in an RDD with user account data.  I could
keep the account data in an RDD or if it's small, a broadcast variable
instead.  How small is small?  Small enough that I know it can easily fit
in memory on a single node?  Some other guideline?

Thanks!

Diana


Re: the spark configuage

2014-04-30 Thread Diana Carroll
I'm guessing your shell stopping when it attempts to connect to the RM is
not related to that warning.  You'll get that message out of the box from
Spark if you don't have HADOOP_HOME set correctly.  I'm using CDH 5.0
installed in default locations, and got rid of the warning by setting
HADOOP_HOME to /usr/lib/hadoop.  The stopping issue might be something
unrelated.

Diana


On Wed, Apr 30, 2014 at 3:58 AM, Sophia sln-1...@163.com wrote:

 Hi,
 when I configue spark, run the shell instruction:
 ./spark-shellit told me like this:
 WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
 builtin-java classes where applicable,when it connect to ResourceManager,it
 stopped. What should I DO?
 Wish your reply



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



running SparkALS

2014-04-28 Thread Diana Carroll
Hi everyone.  I'm trying to run some of the Spark example code, and most of
it appears to be undocumented (unless I'm missing something).  Can someone
help me out?

I'm particularly interested in running SparkALS, which wants parameters:
M U F iter slices

What are these variables?  They appear to be integers and the default
values are 100, 500 and 10 respectively but beyond that...huh?

Thanks!

Diana


Re: running SparkALS

2014-04-28 Thread Diana Carroll
Thanks, Deb.  But I'm looking at  org.apache.spark.examples.SparkALS, which
is not in the mllib examples, and does not take any file parameters.

I don't see the class you refer to in the examples ...however, if I did
want to run that example, where would I find the file in question?

It would be great if this were documented, perhaps in the source code.
 I'll add a JIRA.

Thanks,
Diana


On Mon, Apr 28, 2014 at 1:41 PM, Debasish Das debasish.da...@gmail.comwrote:

 Diana,

 Here are the parameters:

 ./bin/spark-class org.apache.spark.mllib.recommendation.ALS
 Usage: ALS master ratings_file rank iterations output_dir
 [lambda] [implicitPrefs] [alpha] [blocks]

 Master: Local/Deployed spark cluster master
 ratings_file: Netflix format data
 rank: Reduced dimension of the User and Product factors
 iterations: How many ALS iterations you would like to run
 output_dir: Where to generate the usera and product factors

 lambda: regularization for nuclear norm
 implicitPrefs: true will run Koren's netflix prize paper's implicit
 algorithm
 alpha: is valid for implicitPrefs
 blocks: How many blocks you want to partition your rating, user and
 product factor matrix

 I have run with 64 GB JVM with 20M users, 1.6M ratings and 50
 factorsyou should be able to go even beyond that if you want to
 increase the JVM size...

 The scalability issue comes from the fact that each JVM has to collect
 either user/product factors before doing a BLAS posv solveseems like
 that's the bottleneck step...but making double to float is one way to scale
 it even further...

 Thanks.
 Deb



 On Mon, Apr 28, 2014 at 10:30 AM, Diana Carroll dcarr...@cloudera.comwrote:

 Hi everyone.  I'm trying to run some of the Spark example code, and most
 of it appears to be undocumented (unless I'm missing something).  Can
 someone help me out?

 I'm particularly interested in running SparkALS, which wants parameters:
 M U F iter slices

 What are these variables?  They appear to be integers and the default
 values are 100, 500 and 10 respectively but beyond that...huh?

 Thanks!

 Diana





Re: running SparkALS

2014-04-28 Thread Diana Carroll
Should I file a JIRA to remove the example?  I think it is confusing to
include example code without explanation of how to run it, and it sounds
like this one isn't worth running or reviewing anyway.




On Mon, Apr 28, 2014 at 2:34 PM, Debasish Das debasish.da...@gmail.comwrote:

 Don't use SparkALS...that's the first version of the code and does not
 scale...

 Li is right...you have to do the dictionary generation on users, products
 and then generate indexed fileI wrote some utilities but looks like it
 is application dependentthe indexed netflix format is more generic...


 On Mon, Apr 28, 2014 at 10:47 AM, Diana Carroll dcarr...@cloudera.comwrote:

 Thanks, Deb.  But I'm looking at  org.apache.spark.examples.SparkALS,
 which is not in the mllib examples, and does not take any file parameters.

 I don't see the class you refer to in the examples ...however, if I did
 want to run that example, where would I find the file in question?

 It would be great if this were documented, perhaps in the source code.
  I'll add a JIRA.

 Thanks,
 Diana


 On Mon, Apr 28, 2014 at 1:41 PM, Debasish Das 
 debasish.da...@gmail.comwrote:

 Diana,

 Here are the parameters:

 ./bin/spark-class org.apache.spark.mllib.recommendation.ALS
 Usage: ALS master ratings_file rank iterations output_dir
 [lambda] [implicitPrefs] [alpha] [blocks]

 Master: Local/Deployed spark cluster master
 ratings_file: Netflix format data
 rank: Reduced dimension of the User and Product factors
 iterations: How many ALS iterations you would like to run
 output_dir: Where to generate the usera and product factors

 lambda: regularization for nuclear norm
 implicitPrefs: true will run Koren's netflix prize paper's implicit
 algorithm
 alpha: is valid for implicitPrefs
 blocks: How many blocks you want to partition your rating, user and
 product factor matrix

 I have run with 64 GB JVM with 20M users, 1.6M ratings and 50
 factorsyou should be able to go even beyond that if you want to
 increase the JVM size...

 The scalability issue comes from the fact that each JVM has to collect
 either user/product factors before doing a BLAS posv solveseems like
 that's the bottleneck step...but making double to float is one way to scale
 it even further...

 Thanks.
 Deb



 On Mon, Apr 28, 2014 at 10:30 AM, Diana Carroll 
 dcarr...@cloudera.comwrote:

 Hi everyone.  I'm trying to run some of the Spark example code, and
 most of it appears to be undocumented (unless I'm missing something).  Can
 someone help me out?

 I'm particularly interested in running SparkALS, which wants parameters:
 M U F iter slices

 What are these variables?  They appear to be integers and the default
 values are 100, 500 and 10 respectively but beyond that...huh?

 Thanks!

 Diana







checkpointing without streaming?

2014-04-21 Thread Diana Carroll
I'm trying to understand when I would want to checkpoint an RDD rather than
just persist to disk.

Every reference I can find to checkpoint related to Spark Streaming.  But
the method is defined in the core Spark library, not Streaming.

Does it exist solely for streaming, or are there circumstances unrelated to
streaming in which I might want to checkpoint...and if so, like what?

Thanks,
Diana


Re: checkpointing without streaming?

2014-04-21 Thread Diana Carroll
When might that be necessary or useful?  Presumably I can persist and
replicate my RDD to avoid re-computation, if that's my goal.  What
advantage  does checkpointing provide over disk persistence with
replication?


On Mon, Apr 21, 2014 at 2:42 PM, Xiangrui Meng men...@gmail.com wrote:

 Checkpoint clears dependencies. You might need checkpoint to cut a
 long lineage in iterative algorithms. -Xiangrui

 On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com
 wrote:
  I'm trying to understand when I would want to checkpoint an RDD rather
 than
  just persist to disk.
 
  Every reference I can find to checkpoint related to Spark Streaming.  But
  the method is defined in the core Spark library, not Streaming.
 
  Does it exist solely for streaming, or are there circumstances unrelated
 to
  streaming in which I might want to checkpoint...and if so, like what?
 
  Thanks,
  Diana



partitioning of small data sets

2014-04-15 Thread Diana Carroll
I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb

Given the size, and that it is a single file, I assumed it would only be in
a single partition.  But when I cache it,  I can see in the Spark App UI
that it actually splits it into two partitions:

[image: Inline image 1]

Is this correct behavior?  How does Spark decide how big a partition should
be, or how many partitions to create for an RDD.

If it matters, I have only a single worker in my cluster, so both
partitions are stored on the same worker.

The file was on HDFS and was only a single block.

Thanks for any insight.

Diana
inline: sparkdev_2014-04-11.png

using Kryo with pyspark?

2014-04-14 Thread Diana Carroll
I'm looking at the Tuning Guide suggestion to use Kryo instead of default
serialization.  My questions:

Does pyspark use Java serialization by default, as Scala spark does?  If
so, then...
can I use Kryo with pyspark instead?  The instructions say I should
register my classes with the Kryo Serialization, but that's in Java/Scala.
 If I simply set the spark.serializer variable for my SparkContext, will it
at least use Kryo for Spark's own classes, even if I can't register any of
my own classes?

Thanks,
Diana


Re: network wordcount example

2014-03-31 Thread Diana Carroll
Not sure what data you are sending in.  You could try calling
lines.print() instead which should just output everything that comes in
on the stream.  Just to test that your socket is receiving what you think
you are sending.


On Mon, Mar 31, 2014 at 12:18 PM, eric perler ericper...@hotmail.comwrote:

 Hello

 i just started working with spark today... and i am trying to run the
 wordcount network example

 i created a socket server and client.. and i am sending data to the server
 in an infinite loop

 when i run the spark class.. i see this output in the console...

 ---
 Time: 1396281891000 ms
 ---

 14/03/31 11:04:51 INFO SparkContext: Job finished: take at
 DStream.scala:586, took 0.056794606 s
 14/03/31 11:04:51 INFO JobScheduler: Finished job streaming job
 1396281891000 ms.0 from job set of time 1396281891000 ms
 14/03/31 11:04:51 INFO JobScheduler: Total delay: 0.101 s for time
 1396281891000 ms (execution: 0.058 s)
 14/03/31 11:04:51 INFO TaskSchedulerImpl: Remove TaskSet 3.0 from pool

 but i dont see any output from the workcount operation when i make this
 call...

 wordCounts.print();

 any help is greatly appreciated

 thanks in advance



streaming: code to simulate a network socket data source

2014-03-28 Thread Diana Carroll
If you are learning about Spark Streaming, as I am, you've probably use
netcat nc as mentioned in the spark streaming programming guide.  I
wanted something a little more useful, so I modified the
ClickStreamGenerator code to make a very simple script that simply reads a
file off disk and passes it to a socket, character by character.  You
specify the port, filename, and bytesPerSecond that you want it to send.

Thought someone else might find this helpful, so here it is.

import java.net.ServerSocket
import java.io.PrintWriter
import scala.io.Source

object StreamingDataGenerator {

  def main(args : Array[String]) {
if (args.length != 3) {
  System.err.println(Usage: StreamingDataGenerator port file
bytesPerSecond)
  System.exit(1)
}
val port = args(0).toInt
val file = Source.fromFile(args(1))
val bytesPerSecond = args(2).toFloat

val sleepDelayMs = (1000.0 / bytesPerSecond).toInt
val listener = new ServerSocket(port)

println(Reading from file:  + file.descr)

while (true) {
  println(Listening on port:  + port)
  val socket = listener.accept()
  new Thread() {
override def run = {
  println(Got client connect from:  + socket.getInetAddress)
  val out = new PrintWriter(socket.getOutputStream(), true)

  file.foreach(c =
{
  Thread.sleep(sleepDelayMs)
  // write the byte to the socket
  out.write(c)
  out.flush()
  // also print the byte to stdout, for debugging ease
  print(c)
}
  )
  socket.close()
}
  }.start()
}
  }
}


spark streaming: what is awaitTermination()?

2014-03-27 Thread Diana Carroll
The API docs for ssc.awaitTermination say simply Wait for the execution to
stop. Any exceptions that occurs during the execution will be thrown in
this thread.

Can someone help me understand what this means?  What causes execution to
stop?  Why do we need to wait for that to happen?

I tried removing it from my simple NetworkWordCount example (running
locally, not on a cluster) and nothing changed.  In both cases, I end my
program by hitting Ctrl-C.

Thanks for any insight you can give me.

Diana


spark streaming and the spark shell

2014-03-27 Thread Diana Carroll
I'm working with spark streaming using spark-shell, and hoping folks could
answer a few questions I have.

I'm doing WordCount on a socket stream:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
var ssc = new StreamingContext(sc,Seconds(5))
var mystream = ssc.socketTextStream(localhost,)
var words = mystream.flatMap(line = line.split( ))
var wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y)
wordCounts.print()
ssc.start()



1.  I'm assuming that using spark shell is an edge case, and that spark
streaming is really intended mostly for batch use.  True?

2.   I notice that once I start ssc.start(), my stream starts processing
and continues indefinitely...even if I close the socket on the server end
(I'm using unix command nc to mimic a server as explained in the
streaming programming guide .)  Can I tell my stream to detect if it's lost
a connection and therefore stop executing?  (Or even better, to attempt to
re-establish the connection?)

3.  I tried entering ssc.stop which resulted in an error:

Exception in thread Thread-43 org.apache.spark.SparkException: Job
cancelled because SparkContext was shut down
14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
SendingConnectionManagerId not found

But it did stop the DStream execution.

4.  Then I tried restarting the ssc again (ssc.start) and got another error:
org.apache.spark.SparkException: JobScheduler already started
Is restarting an ssc supported?

5.  When I perform an operation like wordCounts.print(), that operation
will execution on each batch, ever n seconds.  Is there a way I can undo
that operation?  That is, I want it to *stop* executing that print ever n
seconds...without having to stop the stream.

What I'm really asking is...can I explore DStreams interactively the way I
can explore my data in regular Spark.  In regular Spark, I might perform
various operations on an RDD to see what happens.  So at first, I might
have used split( ) to tokenize my input text, but now I want to try
using split(,) instead, after the stream has already started running.
 Can I do that?

I did find out that if add a new operation to an existing dstream (say,
words.print()) *after *the ssc.start it works. It *will* add the second
print() call to the execution list every n seconds.

but if I try to add new dstreams, e.g.
...

ssc.start()

var testpairs = words.map(x = (x, TEST))
testpairs.print()


I get an error:

14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
139593227 ms
java.lang.Exception:
org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
initialized


Is this sort of interactive use just not supported?

Thanks!

Diana


streaming questions

2014-03-26 Thread Diana Carroll
I'm trying to understand Spark streaming, hoping someone can help.

I've kinda-sorta got a version of Word Count running, and it looks like
this:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object StreamingWordCount {

  def main(args: Array[String]) {
if (args.length  3) {
  System.err.println(Usage: StreamingWordCount master hostname
port)
  System.exit(1)
}

val master = args(0)
val hostname = args(1)
val port = args(2).toInt

val ssc = new StreamingContext(master, Streaming Word
Count,Seconds(2))
val lines = ssc.socketTextStream(hostname, port)
val words = lines.flatMap(line = line.split( ))
val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
 }
}

(I also have a small script that sends text to that port.)

*Question 1:*
When I run this, I don't get any output from the wordCounts.print as long
as my data is still streaming.  I have to stop my streaming data script
before my program will display the word counts.

Why is that?  What if my stream is indefinite?  I thought the point of
Streaming was that it would process it in real time?

*Question 2:*
While I run this (and the stream is still sending) I get continuous warning
messages like this:
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already
exists on this machine; not re-adding it
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already
exists on this machine; not re-adding it

What does that mean?

*Question 3:*
I tried replacing the wordCounts.print() line with
wordCounts.saveAsTextFiles(file:/my/path/outdir).
This results in the creation of a new outdir-timestamp file being created
every two seconds...even if there's no data during that time period.  Is
there a way to tell it to save only if there's data?

Thanks!


Re: streaming questions

2014-03-26 Thread Diana Carroll
Thanks, Tagatha, very helpful.  A follow-up question below...


On Wed, Mar 26, 2014 at 2:27 PM, Tathagata Das
tathagata.das1...@gmail.comwrote:



 *Answer 3:*You can do something like
 wordCounts.foreachRDD((rdd: RDD[...], time: Time) = {
if (rdd.take(1).size == 1) {
   // There exists at least one element in RDD, so save it to file
   rdd.saveAsTextFile(generate file name based on time)
}
 }

 Is calling foreachRDD and performing an operation on each individually as
efficient as performing the operation on the dstream?  Is this foreach
pretty much what dstream.saveAsTextFiles is doing anyway?

This also brings up a question I have about caching in the context of
streaming.  In  this example, would I want to call rdd.cache()?  I'm
calling two successive operations on the same rdd (take(1) and then
saveAsTextFile))...if I were doing this in regular Spark I'd want to cache
so I wouldn't need to re-calculate the rdd for both calls.  Does the same
apply here?

Thanks,
Diana


quick start guide: building a standalone scala program

2014-03-24 Thread Diana Carroll
Has anyone successfully followed the instructions on the Quick Start page
of the Spark home page to run a standalone Scala application?  I can't,
and I figure I must be missing something obvious!

I'm trying to follow the instructions here as close to word for word as
possible:
http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala

1.  The instructions don't say what directory to create my test application
in, but later I'm instructed to run sbt/sbt so I conclude that my working
directory must be $SPARK_HOME.  (Temporarily ignoring that it is a little
weird to be working directly in the Spark distro.)

2.  Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala.
 Copypaste in the code from the instructions exactly, replacing
YOUR_SPARK_HOME with my spark home path.

3.  Create $SPARK_HOME/mysparktest/simple.sbt.  Copypaste in the sbt file
from the instructions

4.  From the $SPARK_HOME I run sbt/sbt package.  It runs through the
ENTIRE Spark project!  This takes several minutes, and at the end, it says
Done packaging.  unfortunately, there's nothing in the
$SPARK_HOME/mysparktest/ folder other than what I already had there.

(Just for fun, I also did what I thought was more logical, which is set my
working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt
package, but that was even less successful: I got an error:
awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for
reading (No such file or directory)
Attempting to fetch sbt
/usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
directory
/usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
directory
Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
install sbt manually from http://www.scala-sbt.org/


So, help?  I'm sure these instructions work because people are following
them every day, but I can't tell what they are supposed to do.

Thanks!
Diana


Re: quick start guide: building a standalone scala program

2014-03-24 Thread Diana Carroll
Yana: Thanks.  Can you give me a transcript of the actual commands you are
running?

THanks!
Diana


On Mon, Mar 24, 2014 at 3:59 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote:

 I am able to run standalone apps. I think you are making one mistake
 that throws you off from there onwards. You don't need to put your app
 under SPARK_HOME. I would create it in its own folder somewhere, it
 follows the rules of any standalone scala program (including the
 layout). In the giude, $SPARK_HOME is only relevant to find the Readme
 file which they are parsing/word-counting. But otherwise the compile
 time dependencies on spark would be resolved via the sbt file (or the
 pom file if you look at the Java example).

 So for example I put my app under C:\Source\spark-code and the jar
 gets created in C:\Source\spark-code\target\scala-2.9.3 (or 2.10 if
 you're running with scala 2.10 as the example shows). But for that
 part of the guide, it's not any different than building a scala app.

 On Mon, Mar 24, 2014 at 3:44 PM, Diana Carroll dcarr...@cloudera.com
 wrote:
  Has anyone successfully followed the instructions on the Quick Start
 page of
  the Spark home page to run a standalone Scala application?  I can't,
 and I
  figure I must be missing something obvious!
 
  I'm trying to follow the instructions here as close to word for word as
  possible:
 
 http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala
 
  1.  The instructions don't say what directory to create my test
 application
  in, but later I'm instructed to run sbt/sbt so I conclude that my
 working
  directory must be $SPARK_HOME.  (Temporarily ignoring that it is a little
  weird to be working directly in the Spark distro.)
 
  2.  Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala.
  Copypaste in the code from the instructions exactly, replacing
  YOUR_SPARK_HOME with my spark home path.
 
  3.  Create $SPARK_HOME/mysparktest/simple.sbt.  Copypaste in the sbt
 file
  from the instructions
 
  4.  From the $SPARK_HOME I run sbt/sbt package.  It runs through the
  ENTIRE Spark project!  This takes several minutes, and at the end, it
 says
  Done packaging.  unfortunately, there's nothing in the
  $SPARK_HOME/mysparktest/ folder other than what I already had there.
 
  (Just for fun, I also did what I thought was more logical, which is set
 my
  working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt
  package, but that was even less successful: I got an error:
  awk: cmd. line:1: fatal: cannot open file `./project/build.properties'
 for
  reading (No such file or directory)
  Attempting to fetch sbt
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
  install sbt manually from http://www.scala-sbt.org/
 
 
  So, help?  I'm sure these instructions work because people are following
  them every day, but I can't tell what they are supposed to do.
 
  Thanks!
  Diana
 



Re: quick start guide: building a standalone scala program

2014-03-24 Thread Diana Carroll
Thanks, Nan Zhu.

You say that my problems are because you are in Spark directory, don't
need to do that actually , the dependency on Spark is resolved by sbt

I did try it initially in what I thought was a much more typical place,
e.g. ~/mywork/sparktest1.  But as I said in my email:

(Just for fun, I also did what I thought was more logical, which is set my
working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt
package, but that was even less successful: I got an error:
awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for
reading (No such file or directory)
Attempting to fetch sbt
/usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
directory
/usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
directory
Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
install sbt manually from http://www.scala-sbt.org/



On Mon, Mar 24, 2014 at 4:00 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  Hi, Diana,

 See my inlined answer

 --
 Nan Zhu


 On Monday, March 24, 2014 at 3:44 PM, Diana Carroll wrote:

 Has anyone successfully followed the instructions on the Quick Start page
 of the Spark home page to run a standalone Scala application?  I can't,
 and I figure I must be missing something obvious!

 I'm trying to follow the instructions here as close to word for word as
 possible:

 http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala

 1.  The instructions don't say what directory to create my test
 application in, but later I'm instructed to run sbt/sbt so I conclude
 that my working directory must be $SPARK_HOME.  (Temporarily ignoring that
 it is a little weird to be working directly in the Spark distro.)


 You can create your application in any directory, just follow the sbt
 project dir structure


 2.  Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala.
  Copypaste in the code from the instructions exactly, replacing
 YOUR_SPARK_HOME with my spark home path.


 should be correct


 3.  Create $SPARK_HOME/mysparktest/simple.sbt.  Copypaste in the sbt file
 from the instructions


 should be correct


 4.  From the $SPARK_HOME I run sbt/sbt package.  It runs through the
 ENTIRE Spark project!  This takes several minutes, and at the end, it says
 Done packaging.  unfortunately, there's nothing in the
 $SPARK_HOME/mysparktest/ folder other than what I already had there.


 because you are in Spark directory, don't need to do that actually , the
 dependency on Spark is resolved by sbt



 (Just for fun, I also did what I thought was more logical, which is set my
 working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt
 package, but that was even less successful: I got an error:
 awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for
 reading (No such file or directory)
 Attempting to fetch sbt
 /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
 directory
 /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
 directory
 Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
 install sbt manually from http://www.scala-sbt.org/


 So, help?  I'm sure these instructions work because people are following
 them every day, but I can't tell what they are supposed to do.

 Thanks!
 Diana





Re: quick start guide: building a standalone scala program

2014-03-24 Thread Diana Carroll
Thanks Ongen.

Unfortunately I'm not able to follow your instructions either.  In
particular:


 sbt compile
 sbt run arguments if any


This doesn't work for me because there's no program on my path called
sbt.  The instructions in the Quick Start guide are specific that I
should call $SPARK_HOME/sbt/sbt.  I don't have any other executable on my
system called sbt.

Did you download and install sbt separately?  In following the Quick Start
guide, that was not stated as a requirement, and I'm trying to run through
the guide word for word.

Diana


On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski 
og...@plainvanillagames.com wrote:

 Diana,

 Anywhere on the filesystem you have read/write access (you need not be in
 your spark home directory):

 mkdir myproject
 cd myproject
 mkdir project
 mkdir target
 mkdir -p src/main/scala
 cp $mypath/$mymysource.scala src/main/scala/
 cp $mypath/myproject.sbt .

 Make sure that myproject.sbt has the following in it:

 name := I NEED A NAME!

 version := I NEED A VERSION!

 scalaVersion := 2.10.3

 libraryDependencies += org.apache.spark % spark-core_2.10 %
 0.9.0-incubating

 If you will be using Hadoop/HDFS functionality you will need the below
 line also

 libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0

 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are
 using 0.8.1 - adjust appropriately.

 That's it. Now you can do

 sbt compile
 sbt run arguments if any

 You can also do
 sbt package to produce a jar file of your code which you can then add to
 the SparkContext at runtime.

 In a more complicated project you may need to have a bit more involved
 hierarchy like com.github.dianacarroll which will then translate to
 src/main/scala/com/github/dianacarroll/ where you can put your multiple
 .scala files which will then have to be a part of a package
 com.github.dianacarroll (you can just put that as your first line in each
 of these scala files). I am new to Java/Scala so this is how I do it. More
 educated Java/Scala programmers may tell you otherwise ;)

 You can get more complicated with the sbt project subrirectory but you can
 read independently about sbt and what it can do, above is the bare minimum.

 Let me know if that helped.
 Ognen


 On 3/24/14, 2:44 PM, Diana Carroll wrote:

 Has anyone successfully followed the instructions on the Quick Start page
 of the Spark home page to run a standalone Scala application?  I can't,
 and I figure I must be missing something obvious!

 I'm trying to follow the instructions here as close to word for word as
 possible:
 http://spark.apache.org/docs/latest/quick-start.html#a-
 standalone-app-in-scala

 1.  The instructions don't say what directory to create my test
 application in, but later I'm instructed to run sbt/sbt so I conclude
 that my working directory must be $SPARK_HOME.  (Temporarily ignoring that
 it is a little weird to be working directly in the Spark distro.)

 2.  Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala.
  Copypaste in the code from the instructions exactly, replacing
 YOUR_SPARK_HOME with my spark home path.

 3.  Create $SPARK_HOME/mysparktest/simple.sbt.  Copypaste in the sbt
 file from the instructions

 4.  From the $SPARK_HOME I run sbt/sbt package.  It runs through the
 ENTIRE Spark project!  This takes several minutes, and at the end, it says
 Done packaging.  unfortunately, there's nothing in the
 $SPARK_HOME/mysparktest/ folder other than what I already had there.

 (Just for fun, I also did what I thought was more logical, which is set
 my working directory to $SPARK_HOME/mysparktest, and but
 $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an
 error:
 awk: cmd. line:1: fatal: cannot open file `./project/build.properties'
 for reading (No such file or directory)
 Attempting to fetch sbt
 /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
 directory
 /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
 directory
 Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
 install sbt manually from http://www.scala-sbt.org/


 So, help?  I'm sure these instructions work because people are following
 them every day, but I can't tell what they are supposed to do.

 Thanks!
 Diana





Re: quick start guide: building a standalone scala program

2014-03-24 Thread Diana Carroll
Yeah, that's exactly what I did. Unfortunately it doesn't work:

$SPARK_HOME/sbt/sbt package
awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for
reading (No such file or directory)
Attempting to fetch sbt
/usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
directory
/usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
directory
Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
install sbt manually from http://www.scala-sbt.org/



On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski 
og...@plainvanillagames.com wrote:

  You can use any sbt on your machine, including the one that comes with
 spark. For example, try:

 ~/path_to_spark/sbt/sbt compile
 ~/path_to_spark/sbt/sbt run arguments

 Or you can just add that to your PATH by:

 export $PATH=$PATH:~/path_to_spark/sbt

 To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile
 or ??? depending on the system you are using. If you are on Windows, sorry,
 I can't offer any help there ;)

 Ognen


 On 3/24/14, 3:16 PM, Diana Carroll wrote:

 Thanks Ongen.

  Unfortunately I'm not able to follow your instructions either.  In
 particular:


 sbt compile
 sbt run arguments if any


  This doesn't work for me because there's no program on my path called
 sbt.  The instructions in the Quick Start guide are specific that I
 should call $SPARK_HOME/sbt/sbt.  I don't have any other executable on my
 system called sbt.

  Did you download and install sbt separately?  In following the Quick
 Start guide, that was not stated as a requirement, and I'm trying to run
 through the guide word for word.

  Diana


  On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski 
 og...@plainvanillagames.com wrote:

 Diana,

 Anywhere on the filesystem you have read/write access (you need not be in
 your spark home directory):

 mkdir myproject
 cd myproject
 mkdir project
 mkdir target
 mkdir -p src/main/scala
 cp $mypath/$mymysource.scala src/main/scala/
 cp $mypath/myproject.sbt .

 Make sure that myproject.sbt has the following in it:

 name := I NEED A NAME!

 version := I NEED A VERSION!

 scalaVersion := 2.10.3

 libraryDependencies += org.apache.spark % spark-core_2.10 %
 0.9.0-incubating

 If you will be using Hadoop/HDFS functionality you will need the below
 line also

 libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0

 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are
 using 0.8.1 - adjust appropriately.

 That's it. Now you can do

 sbt compile
 sbt run arguments if any

 You can also do
 sbt package to produce a jar file of your code which you can then add to
 the SparkContext at runtime.

 In a more complicated project you may need to have a bit more involved
 hierarchy like com.github.dianacarroll which will then translate to
 src/main/scala/com/github/dianacarroll/ where you can put your multiple
 .scala files which will then have to be a part of a package
 com.github.dianacarroll (you can just put that as your first line in each
 of these scala files). I am new to Java/Scala so this is how I do it. More
 educated Java/Scala programmers may tell you otherwise ;)

 You can get more complicated with the sbt project subrirectory but you
 can read independently about sbt and what it can do, above is the bare
 minimum.

 Let me know if that helped.
 Ognen


 On 3/24/14, 2:44 PM, Diana Carroll wrote:

 Has anyone successfully followed the instructions on the Quick Start
 page of the Spark home page to run a standalone Scala application?  I
 can't, and I figure I must be missing something obvious!

 I'm trying to follow the instructions here as close to word for word
 as possible:

 http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala

 1.  The instructions don't say what directory to create my test
 application in, but later I'm instructed to run sbt/sbt so I conclude
 that my working directory must be $SPARK_HOME.  (Temporarily ignoring that
 it is a little weird to be working directly in the Spark distro.)

 2.  Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala.
  Copypaste in the code from the instructions exactly, replacing
 YOUR_SPARK_HOME with my spark home path.

 3.  Create $SPARK_HOME/mysparktest/simple.sbt.  Copypaste in the sbt
 file from the instructions

 4.  From the $SPARK_HOME I run sbt/sbt package.  It runs through the
 ENTIRE Spark project!  This takes several minutes, and at the end, it says
 Done packaging.  unfortunately, there's nothing in the
 $SPARK_HOME/mysparktest/ folder other than what I already had there.

 (Just for fun, I also did what I thought was more logical, which is set
 my working directory to $SPARK_HOME/mysparktest, and but
 $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an
 error:
 awk: cmd. line:1: fatal: cannot open file `./project/build.properties'
 for reading (No such file or directory)
 Attempting to fetch sbt
 /usr/lib/spark/sbt/sbt: line

Re: quick start guide: building a standalone scala program

2014-03-24 Thread Diana Carroll
Thanks for your help, everyone.  Several folks have explained that I can
surely solve the problem by installing sbt.

But I'm trying to get the instructions working *as written on the Spark
website*.  The instructions not only don't have you install sbt
separately...they actually specifically have you use the sbt that is
distributed with Spark.

If it is not possible to build your own Spark programs with
Spark-distributed sbt, then that's a big hole in the Spark docs that I
shall file.  And if the sbt that is included with Spark is MEANT to be able
to compile your own Spark apps, then that's a product bug.

But before I file the bug, I'm still hoping I'm missing something, and
someone will point out that I'm missing a small step that will make the
Spark distribution of sbt work!

Diana



On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote:

 Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8
 (since like other folks I had sbt preinstalled on my usual machine)

 I ran the command exactly as Ognen suggested and see
 Set current project to Simple Project (do you see this -- you should
 at least be seeing this)
 and then a bunch of Resolving ...

 messages. I did get an error there, saying it can't find
 javax.servlet.orbit. I googled the error and found this thread:


 http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E

 adding the IvyXML fragment they suggested helped in my case (but
 again, the build pretty clearly complained).

 If you're still having no luck, I suggest installing sbt and setting
 SBT_HOME... http://www.scala-sbt.org/

 In either case though, it's not a Spark-specific issue...Hopefully
 some of all this helps.

 On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com
 wrote:
  Yeah, that's exactly what I did. Unfortunately it doesn't work:
 
  $SPARK_HOME/sbt/sbt package
  awk: cmd. line:1: fatal: cannot open file `./project/build.properties'
 for
  reading (No such file or directory)
  Attempting to fetch sbt
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
  install sbt manually from http://www.scala-sbt.org/
 
 
 
  On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski
  og...@plainvanillagames.com wrote:
 
  You can use any sbt on your machine, including the one that comes with
  spark. For example, try:
 
  ~/path_to_spark/sbt/sbt compile
  ~/path_to_spark/sbt/sbt run arguments
 
  Or you can just add that to your PATH by:
 
  export $PATH=$PATH:~/path_to_spark/sbt
 
  To make it permanent, you can add it to your ~/.bashrc or
 ~/.bash_profile
  or ??? depending on the system you are using. If you are on Windows,
 sorry,
  I can't offer any help there ;)
 
  Ognen
 
 
  On 3/24/14, 3:16 PM, Diana Carroll wrote:
 
  Thanks Ongen.
 
  Unfortunately I'm not able to follow your instructions either.  In
  particular:
 
 
  sbt compile
  sbt run arguments if any
 
 
  This doesn't work for me because there's no program on my path called
  sbt.  The instructions in the Quick Start guide are specific that I
 should
  call $SPARK_HOME/sbt/sbt.  I don't have any other executable on my
 system
  called sbt.
 
  Did you download and install sbt separately?  In following the Quick
 Start
  guide, that was not stated as a requirement, and I'm trying to run
 through
  the guide word for word.
 
  Diana
 
 
  On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski
  og...@plainvanillagames.com wrote:
 
  Diana,
 
  Anywhere on the filesystem you have read/write access (you need not be
 in
  your spark home directory):
 
  mkdir myproject
  cd myproject
  mkdir project
  mkdir target
  mkdir -p src/main/scala
  cp $mypath/$mymysource.scala src/main/scala/
  cp $mypath/myproject.sbt .
 
  Make sure that myproject.sbt has the following in it:
 
  name := I NEED A NAME!
 
  version := I NEED A VERSION!
 
  scalaVersion := 2.10.3
 
  libraryDependencies += org.apache.spark % spark-core_2.10 %
  0.9.0-incubating
 
  If you will be using Hadoop/HDFS functionality you will need the below
  line also
 
  libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0
 
  The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are
  using 0.8.1 - adjust appropriately.
 
  That's it. Now you can do
 
  sbt compile
  sbt run arguments if any
 
  You can also do
  sbt package to produce a jar file of your code which you can then add
 to
  the SparkContext at runtime.
 
  In a more complicated project you may need to have a bit more involved
  hierarchy like com.github.dianacarroll which will then translate to
  src/main/scala/com/github/dianacarroll/ where you can put your multiple
  .scala files which will then have to be a part of a package
  com.github.dianacarroll (you can just put

Re: Writing RDDs to HDFS

2014-03-24 Thread Diana Carroll
Ongen:

I don't know why your process is hanging, sorry.  But I do know that the
way saveAsTextFile works is that you give it a path to a directory, not a
file.  The file is saved in multiple parts, corresponding to the
partitions. (part-0, part-1 etc.)

(Presumably it does this because it allows each partition to be saved on
the local disk, to minimize network traffic.  It's how Hadoop works, too.)




On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski
og...@nengoiksvelzud.comwrote:

 Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt)
 supposed to work? Meaning, can I save files to the HDFS fs this way?

 I tried:

 val r = sc.parallelize(List(1,2,3,4,5,6,7,8))
 r.saveAsTextFile(hdfs://ip:port/path/file.txt)

 and it is just hanging. At the same time on my HDFS it created file.txt
 but as a directory which has subdirectories (the final one is empty).

 Thanks!
 Ognen



Re: quick start guide: building a standalone scala program

2014-03-24 Thread Diana Carroll
It is suggested implicitly in giving you the command ./sbt/sbt. The
separately installed sbt isn't in a folder called sbt, whereas Spark's
version is.  And more relevantly, just a few paragraphs earlier in the
tutorial you execute the command sbt/sbt assembly which definitely refers
to the spark install.

On Monday, March 24, 2014, Nan Zhu zhunanmcg...@gmail.com wrote:

 I found that I never read the document carefully and I never find that
 Spark document is suggesting you to use Spark-distributed sbt..

 Best,

 --
 Nan Zhu


 On Monday, March 24, 2014 at 5:47 PM, Diana Carroll wrote:

 Thanks for your help, everyone.  Several folks have explained that I can
 surely solve the problem by installing sbt.

 But I'm trying to get the instructions working *as written on the Spark
 website*.  The instructions not only don't have you install sbt
 separately...they actually specifically have you use the sbt that is
 distributed with Spark.

 If it is not possible to build your own Spark programs with
 Spark-distributed sbt, then that's a big hole in the Spark docs that I
 shall file.  And if the sbt that is included with Spark is MEANT to be able
 to compile your own Spark apps, then that's a product bug.

 But before I file the bug, I'm still hoping I'm missing something, and
 someone will point out that I'm missing a small step that will make the
 Spark distribution of sbt work!

 Diana



 On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote:

 Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8
 (since like other folks I had sbt preinstalled on my usual machine)

 I ran the command exactly as Ognen suggested and see
 Set current project to Simple Project (do you see this -- you should
 at least be seeing this)
 and then a bunch of Resolving ...

 messages. I did get an error there, saying it can't find
 javax.servlet.orbit. I googled the error and found this thread:


 http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E

 adding the IvyXML fragment they suggested helped in my case (but
 again, the build pretty clearly complained).

 If you're still having no luck, I suggest installing sbt and setting
 SBT_HOME... http://www.scala-sbt.org/

 In either case though, it's not a Spark-specific issue...Hopefully
 some of all this helps.

 On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com
 wrote:
  Yeah, that's exactly what I did. Unfortunately it doesn't work:
 
  $SPARK_HOME/sbt/sbt package
  awk: cmd. line:1: fatal: cannot open file `./project/build.properties'
 for
  reading (No such file or directory)
  Attempting to fetch sbt
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
  install sbt manually from http://www.scala-sbt.org/
 
 
 
  On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski
  og...@plainvanillagames.com wrote:
 
  You can use any sbt on your machine, including the one that comes with
  spark. For example, try:
 
  ~/path_to_spark/sbt/sbt compile
  ~/path_to_spark/sbt/sbt run arguments
 
  Or you can just add that to your PATH by:
 
  export $PATH=$PATH:~/path_to_spark/sbt
 
  To make it permanent, you can add it to your ~/.bashrc or
 ~/.bash_profile
  or ??? depending on the system you are using. If you are on Windows,
 sorry,
  I can't offer any help there ;)
 
  Ognen
 
 
  On 3/24/14, 3:16 PM, Diana Carroll wrote:
 
  Thanks Ongen.
 
  Unfortunately I'm not able to follow your instructions either.  In
  particular:
 
 
  sbt compile
  sbt run arguments if any
 
 
  This doesn't work for me because there's no program on my path called
  sbt.  The instructions in the Quick Start guide are specific that I
 sho




Re: example of non-line oriented input data?

2014-03-19 Thread Diana Carroll
If I don't call iter(), and just return treeiterator directly, I get an
error message that the object is not of an iterator type.  This is in
Python 2.6...perhaps a bug?

BUT I also realized my code was wrong.  It results in an RDD containing all
the tags in all the files.  What I really want is an RDD where each record
corresponds to a single file.  So if I have a thousand files, I should have
a thousand elements in my RDD, each of which is an ElementTree.  (Which I
can then use to map or flatMap to pull out the data I actually care about.)

So, this works:

def parsefile(iterator):
s = ''
for i in iterator: s = s + str(i)
yield ElementTree.fromstring(s)

I would think the ability to process very large numbers of smallish XML
files is pretty common. The use case I'm playing with right now is using a
knowledge base of HTML documents.  Each document in the KB is a single
file, which in my experience is not an unusual configuration.  I'd like to
be able to suck the whole KB into an RDD and then do analysis such as
which keywords are most commonly used in the KB or is there a
correlation between certain user attributes and the KB articles they
request and so on.

Unfortunately I'm not sure I'm best to answer your question about non-text
InputFormats to support.  I'm fairly new to Hadoop (about 8 months) and I'm
not in the field.  My background is in app servers, ecommerce and business
process management, so that's my bias.  From that perspective, it would be
really useful to be able to work with XML/HTML and CSV files...but are
those what big data analysts are actually using Spark for?  I dunno.  And,
really, if I were actually in those fields, I'd be getting the data from a
DB using Shark, right?

Diana


On Tue, Mar 18, 2014 at 6:27 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hi Diana,

 This seems to work without the iter() in front if you just return
 treeiterator. What happened when you didn't include that? Treeiterator
 should return an iterator.

 Anyway, this is a good example of mapPartitions. It's one where you want
 to view the whole file as one object (one XML here), so you couldn't
 implement this using a flatMap, but you still want to return multiple
 values. The MLlib example you saw needs Python 2.7 because unfortunately
 that is a requirement for our Python MLlib support (see
 http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
 We'd like to relax this later but we're using some newer features of NumPy
 and Python. The rest of PySpark works on 2.6.

 In terms of the size in memory, here both the string s and the XML tree
 constructed from it need to fit in, so you can't work on very large
 individual XML files. You may be able to use a streaming XML parser instead
 to extract elements from the data in a streaming fashion, without every
 materializing the whole tree.
 http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis
  one example.

 Matei

 On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote:

 Well, if anyone is still following this, I've gotten the following code
 working which in theory should allow me to parse whole XML files: (the
 problem was that I can't return the tree iterator directly.  I have to call
 iter().  Why?)

 import xml.etree.ElementTree as ET

 # two source files, format data country
 name=../country.../data
 mydata=sc.textFile(file:/home/training/countries*.xml)

 def parsefile(iterator):
 s = ''
 for i in iterator: s = s + str(i)
 tree = ET.fromstring(s)
 treeiterator = tree.getiterator(country)
 # why to I have to convert an iterator to an iterator?  not sure but
 required
 return iter(treeiterator)

 mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
 element.attrib).collect()

 The output is what I expect:
 [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]

 BUT I'm a bit concerned about the construction of the string s.  How big
 can my file be before converting it to a string becomes problematic?



 On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.comwrote:

 Thanks, Matei.

 In the context of this discussion, it would seem mapParitions is
 essential, because it's the only way I'm going to be able to process each
 file as a whole, in our example of a large number of small XML files which
 need to be parsed as a whole file because records are not required to be on
 a single line.

 The theory makes sense but I'm still utterly lost as to how to implement
 it.  Unfortunately there's only a single example of the use of
 mapPartitions in any of the Python example programs, which is the log
 regression example, which I can't run because it requires Python 2.7 and
 I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
 is unsupported...is it?)

 I'd really really love to see a real life example of a Python use of
 mapPartitions.  I do appreciate the very simple examples

Re: example of non-line oriented input data?

2014-03-19 Thread Diana Carroll
Actually, thinking more on this question, Matei: I'd definitely say support
for Avro.  There's a lot of interest in this!!


On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 BTW one other thing -- in your experience, Diana, which non-text
 InputFormats would be most useful to support in Python first? Would it be
 Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or
 something else? I think a per-file text input format that does the stuff we
 did here would also be good.

 Matei


 On Mar 18, 2014, at 3:27 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Hi Diana,

 This seems to work without the iter() in front if you just return
 treeiterator. What happened when you didn't include that? Treeiterator
 should return an iterator.

 Anyway, this is a good example of mapPartitions. It's one where you want
 to view the whole file as one object (one XML here), so you couldn't
 implement this using a flatMap, but you still want to return multiple
 values. The MLlib example you saw needs Python 2.7 because unfortunately
 that is a requirement for our Python MLlib support (see
 http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
 We'd like to relax this later but we're using some newer features of NumPy
 and Python. The rest of PySpark works on 2.6.

 In terms of the size in memory, here both the string s and the XML tree
 constructed from it need to fit in, so you can't work on very large
 individual XML files. You may be able to use a streaming XML parser instead
 to extract elements from the data in a streaming fashion, without every
 materializing the whole tree.
 http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis
  one example.

 Matei

 On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote:

 Well, if anyone is still following this, I've gotten the following code
 working which in theory should allow me to parse whole XML files: (the
 problem was that I can't return the tree iterator directly.  I have to call
 iter().  Why?)

 import xml.etree.ElementTree as ET

 # two source files, format data country
 name=../country.../data
 mydata=sc.textFile(file:/home/training/countries*.xml)

 def parsefile(iterator):
 s = ''
 for i in iterator: s = s + str(i)
 tree = ET.fromstring(s)
 treeiterator = tree.getiterator(country)
 # why to I have to convert an iterator to an iterator?  not sure but
 required
 return iter(treeiterator)

 mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
 element.attrib).collect()

 The output is what I expect:
 [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]

 BUT I'm a bit concerned about the construction of the string s.  How big
 can my file be before converting it to a string becomes problematic?



 On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.comwrote:

 Thanks, Matei.

 In the context of this discussion, it would seem mapParitions is
 essential, because it's the only way I'm going to be able to process each
 file as a whole, in our example of a large number of small XML files which
 need to be parsed as a whole file because records are not required to be on
 a single line.

 The theory makes sense but I'm still utterly lost as to how to implement
 it.  Unfortunately there's only a single example of the use of
 mapPartitions in any of the Python example programs, which is the log
 regression example, which I can't run because it requires Python 2.7 and
 I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
 is unsupported...is it?)

 I'd really really love to see a real life example of a Python use of
 mapPartitions.  I do appreciate the very simple examples you provided, but
 (perhaps because of my novice status on Python) I can't figure out how to
 translate those to a real world situation in which I'm building RDDs from
 files, not inline collections like [(1,2),(2,3)].

 Also, you say that the function called in mapPartitions can return a
 collection OR an iterator.  I tried returning an iterator by calling
 ElementTree getiterator function, but still got the error telling me my
 object was not an iterator.

 If anyone has a real life example of mapPartitions returning a Python
 iterator, that would be fabulous.

 Diana


 On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia 
 matei.zaha...@gmail.comwrote:

 Oh, I see, the problem is that the function you pass to mapPartitions
 must itself return an iterator or a collection. This is used so that you
 can return multiple output records for each input record. You can implement
 most of the existing map-like operations in Spark, such as map, filter,
 flatMap, etc, with mapPartitions, as well as new ones that might do a
 sliding window over each partition for example, or accumulate data across
 elements (e.g. to compute a sum).

 For example, if you have data = sc.parallelize([1, 2, 3, 4], 2

Re: example of non-line oriented input data?

2014-03-18 Thread Diana Carroll
Well, if anyone is still following this, I've gotten the following code
working which in theory should allow me to parse whole XML files: (the
problem was that I can't return the tree iterator directly.  I have to call
iter().  Why?)

import xml.etree.ElementTree as ET

# two source files, format data country
name=../country.../data
mydata=sc.textFile(file:/home/training/countries*.xml)

def parsefile(iterator):
s = ''
for i in iterator: s = s + str(i)
tree = ET.fromstring(s)
treeiterator = tree.getiterator(country)
# why to I have to convert an iterator to an iterator?  not sure but
required
return iter(treeiterator)

mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
element.attrib).collect()

The output is what I expect:
[{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]

BUT I'm a bit concerned about the construction of the string s.  How big
can my file be before converting it to a string becomes problematic?



On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.comwrote:

 Thanks, Matei.

 In the context of this discussion, it would seem mapParitions is
 essential, because it's the only way I'm going to be able to process each
 file as a whole, in our example of a large number of small XML files which
 need to be parsed as a whole file because records are not required to be on
 a single line.

 The theory makes sense but I'm still utterly lost as to how to implement
 it.  Unfortunately there's only a single example of the use of
 mapPartitions in any of the Python example programs, which is the log
 regression example, which I can't run because it requires Python 2.7 and
 I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
 is unsupported...is it?)

 I'd really really love to see a real life example of a Python use of
 mapPartitions.  I do appreciate the very simple examples you provided, but
 (perhaps because of my novice status on Python) I can't figure out how to
 translate those to a real world situation in which I'm building RDDs from
 files, not inline collections like [(1,2),(2,3)].

 Also, you say that the function called in mapPartitions can return a
 collection OR an iterator.  I tried returning an iterator by calling
 ElementTree getiterator function, but still got the error telling me my
 object was not an iterator.

 If anyone has a real life example of mapPartitions returning a Python
 iterator, that would be fabulous.

 Diana


 On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Oh, I see, the problem is that the function you pass to mapPartitions
 must itself return an iterator or a collection. This is used so that you
 can return multiple output records for each input record. You can implement
 most of the existing map-like operations in Spark, such as map, filter,
 flatMap, etc, with mapPartitions, as well as new ones that might do a
 sliding window over each partition for example, or accumulate data across
 elements (e.g. to compute a sum).

 For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this
 will work:

  data.mapPartitions(lambda x: x).collect()
 [1, 2, 3, 4]   # Just return the same iterator, doing nothing

  data.mapPartitions(lambda x: [list(x)]).collect()
 [[1, 2], [3, 4]]   # Group together the elements of each partition in a
 single list (like glom)

  data.mapPartitions(lambda x: [sum(x)]).collect()
 [3, 7]   # Sum each partition separately

 However something like data.mapPartitions(lambda x: sum(x)).collect()
 will *not* work because sum returns a number, not an iterator. That's why I
 put sum(x) inside a list above.

 In practice mapPartitions is most useful if you want to share some data
 or work across the elements. For example maybe you want to load a lookup
 table once from an external file and then check each element in it, or sum
 up a bunch of elements without allocating a lot of vector objects.

 Matei


 On Mar 17, 2014, at 11:25 AM, Diana Carroll dcarr...@cloudera.com
 wrote:

  There's also mapPartitions, which gives you an iterator for each
 partition instead of an array. You can then return an iterator or list of
 objects to produce from that.
 
  I confess, I was hoping for an example of just that, because i've not
 yet been able to figure out how to use mapPartitions.  No doubt this is
 because i'm a rank newcomer to Python, and haven't fully wrapped my head
 around iterators.  All I get so far in my attempts to use mapPartitions is
 the darned suchnsuch is not an iterator error.
 
  def myfunction(iterator): return [1,2,3]
  mydata.mapPartitions(lambda x: myfunction(x)).take(2)
 
 
 
 
 
  On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
  Here's an example of getting together all lines in a file as one string:
 
  $ cat dir/a.txt
  Hello
  world!
 
  $ cat dir/b.txt
  What's
  up??
 
  $ bin/pyspark
   files = sc.textFile(dir)
 
   files.collect()
  [u'Hello', u'world!', uWhat's

Re: example of non-line oriented input data?

2014-03-17 Thread Diana Carroll
There's also mapPartitions, which gives you an iterator for each partition
instead of an array. You can then return an iterator or list of objects to
produce from that.

I confess, I was hoping for an example of just that, because i've not yet
been able to figure out how to use mapPartitions.  No doubt this is because
i'm a rank newcomer to Python, and haven't fully wrapped my head around
iterators.  All I get so far in my attempts to use mapPartitions is the
darned suchnsuch is not an iterator error.

def myfunction(iterator): return [1,2,3]
mydata.mapPartitions(lambda x: myfunction(x)).take(2)





On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Here's an example of getting together all lines in a file as one string:

 $ cat dir/a.txt
 Hello
 world!

 $ cat dir/b.txt
 What's
 up??

 $ bin/pyspark
  files = sc.textFile(dir)

  files.collect()
 [u'Hello', u'world!', uWhat's, u'up??']   # one element per line, not
 what we want

  files.glom().collect()
 [[u'Hello', u'world!'], [uWhat's, u'up??']]   # one element per file,
 which is an array of lines

  files.glom().map(lambda a: \n.join(a)).collect()
 [u'Hello\nworld!', uWhat's\nup??]# join back each file into a single
 string

 The glom() method groups all the elements of each partition of an RDD into
 an array, giving you an RDD of arrays of objects. If your input is small
 files, you always have one partition per file.

 There's also mapPartitions, which gives you an iterator for each partition
 instead of an array. You can then return an iterator or list of objects to
 produce from that.

 Matei


 On Mar 17, 2014, at 10:46 AM, Diana Carroll dcarr...@cloudera.com wrote:

  Thanks Matei.  That makes sense.  I have here a dataset of many many
 smallish XML files, so using mapPartitions that way would make sense.  I'd
 love to see a code example though ...It's not as obvious to me how to do
 that as I probably should be.
 
  Thanks,
  Diana
 
 
  On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
  Hi Diana,
 
  Non-text input formats are only supported in Java and Scala right now,
 where you can use sparkContext.hadoopFile or .hadoopDataset to load data
 with any InputFormat that Hadoop MapReduce supports. In Python, you
 unfortunately only have textFile, which gives you one record per line. For
 JSON, you'd have to fit the whole JSON object on one line as you said.
 Hopefully we'll also have some other forms of input soon.
 
  If your input is a collection of separate files (say many .xml files),
 you can also use mapPartitions on it to group together the lines because
 each input file will end up being a single dataset partition (or map task).
 This will let you concatenate the lines in each file and parse them as one
 XML object.
 
  Matei
 
  On Mar 17, 2014, at 9:52 AM, Diana Carroll dcarr...@cloudera.com
 wrote:
 
  Thanks, Krakna, very helpful.  The way I read the code, it looks like
 you are assuming that each line in foo.log contains a complete json object?
  (That is, that the data doesn't contain any records that are split into
 multiple lines.)  If so, is that because you know that to be true of your
 data?  Or did you do as Nicholas suggests and have some preprocessing on
 the text input to flatten the data in that way?
 
  Thanks,
  Diana
 
 
  On Mon, Mar 17, 2014 at 12:09 PM, Krakna H shankark+...@gmail.com
 wrote:
  Katrina,
 
  Not sure if this is what you had in mind, but here's some simple
 pyspark code that I recently wrote to deal with JSON files.
 
  from pyspark import SparkContext, SparkConf
 
 
 
  from operator import add
  import json
 
 
 
  import random
  import numpy as np
 
 
 
 
  def concatenate_paragraphs(sentence_array):
 
 
 
  return ' '.join(sentence_array).split(' ')
 
 
 
 
  logFile = 'foo.json'
  conf = SparkConf()
 
 
 
 
 conf.setMaster(spark://cluster-master:7077).setAppName(example).set(spark.executor.memory,
 1g)
 
 
 
 
 
 
 
  sc = SparkContext(conf=conf)
 
 
 
  logData = sc.textFile(logFile).cache()
 
 
 
  num_lines = logData.count()
  print 'Number of lines: %d' % num_lines
 
 
 
 
 
 
 
  # JSON object has the structure: {key: {'paragraphs': [sentence1,
 sentence2, ...]}}
  tm = logData.map(lambda s: (json.loads(s)['key'],
 len(concatenate_paragraphs(json.loads(s)['paragraphs']
 
 
 
 
 
 
 
  tm = tm.reduceByKey(lambda _, x: _ + x)
 
 
 
 
 
 
 
  op = tm.collect()
  for key, num_words in op:
 
 
 
   print 'state: %s, num_words: %d' % (state, num_words)
 
 
 
 
 
 
 
 
 
 
 
  On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User
 List] [hidden email] wrote:
  I don't actually have any data.  I'm writing a course that teaches
 students how to do this sort of thing and am interested in looking at a
 variety of real life examples of people doing things like that.  I'd love
 to see some working code implementing the obvious work-around you
 mention...do you have any to share?  It's an approach that makes a lot