Re: Question regarding doing aggregation over custom partitions

2014-05-03 Thread Arun Swami
Thanks, that was what I was missing!


*Arun Swami*
+1 408-338-0906

On Fri, May 2, 2014 at 4:28 AM, Mayur Rustagi wrote:

> You need to first partition the data by the key
> Use mappartition instead of map.
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> @mayur_rustagi 
> On Fri, May 2, 2014 at 5:33 AM, Arun Swami  wrote:
>> Hi,
>> I am a newbie to Spark. I looked for documentation or examples to answer
>> my question but came up empty handed.
>> I don't know whether I am using the right terminology but here goes.
>> I have a file of records. Initially, I had the following Spark program (I
>> am omitting all the surrounding code and focusing only on the Spark related
>> code):
>> ...
>> val recordsRDD = sc.textFile(pathSpec, 2).cache
>> val countsRDD: RDD[(String, Int)] = recordsRDD.flatMap(x =>
>> getCombinations(x))
>>   .map(e => (e, 1))
>>   .reduceByKey(_ + _)
>> ...
>> Here getCombinations() is a function I have written that takes a record
>> and returns List[String].
>> This program works as expected.
>> Now, I want to do the following. I want to partition the records in
>> recordsRDD by some key extracted from each record. I do this as follows:
>> val keyValueRecordsRDD: RDD[(String, String)] =
>>   recodsRDD.flatMap(getKeyValueRecord(_))
>> Here getKeyValueRecord() is a function I have written that takes a record
>> and returns a Tuple2 of a key and the original record.
>> Now I want to do the same operations as before (getCombinations(), and
>> count occurrences) BUT on each partition as defined by the key.
>> Essentially, I want to apply the operations individually in each partition.
>> In a separate step, I want to recover the
>> global counts across all partitions while keeping the partition based
>> counts.
>> How can I do this in Spark?
>> Thanks!
>> arun
>> *__*
>> *Arun Swami*

Re: string to int conversion

2014-05-03 Thread Sean Owen
On Sat, May 3, 2014 at 2:00 AM, SK  wrote:
> 1) I have a csv file where one of the field has integer data but it appears
> as strings: "1", "3" etc. I tried using toInt to implcitly  convert the
> strings to int after reading (field(3).toInt). But I got a
> NumberFormatException. So I defined my own conversion
> as follows, but I still get a NumberFormatException - the toInt function on
> StringOps is failing. Any idea, how I can convert strings to int?

toInt certainly works, and you need not write your own conversion,
but, a string with double-quotes is not a valid number. To be safest,
you might write an "unquote" function that strips quotes if they
exist, rather than just remove all quotes in the string. (Or see above
using a Java library that specializes in handling all the details of
CSV, including quoted and escaped fields -- I liked SuperCSV.)

Re: "sbt/sbt run" command returns a JVM problem

2014-05-03 Thread Carter
Hi, thanks for all your help.
I tried your setting in the sbt file, but the problem is still there.

The Java setting in my sbt file is:
java \
  -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
  -jar ${JAR} \

I have tried to set these 3 parameters bigger and smaller, but nothing
works. Did I change the right thing?

Thank you very much.

View this message in context:
Sent from the Apache Spark User List mailing list archive at

Re: Spark's behavior

2014-05-03 Thread Eduardo Costa Alfaia
Hi TD, Thanks for reply
This last experiment I did with one computer, like local, but I think that time 
gap grow up when I add more computer. I will do again now with 8 worker and 1 
word source and I will see what’s go on. I will control the time too, like 
suggested by Andrew. 
On May 3, 2014, at 1:19, Tathagata Das  wrote:

> From the logs, I see that the print() starts printing stuff 10 seconds after 
> the context is started. And that 10 seconds is taken by the initial empty job 
> (50 map + 20 reduce tasks) that spark streaming starts to ensure all the 
> executors have started. Somehow the first empty task takes 7-8 seconds to 
> complete. See if this can be reproduced by running a simple, empty job in 
> spark shell (in the same cluster) and see if the first task takes 7-8 
> seconds. 
> Either way, I didnt see the 30 second gap, but a 10 second gap. And that does 
> not seem to be a persistent problem as after that 10 seconds, the data is 
> being received and processed.
> TD
> On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia  
> wrote:
> Hi TD,
> I got the another information today using Spark 1.0 RC3 and the situation 
> remain the same:
> The lines begin after 17 sec:
> 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID 
> app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores, 
> 2.0 GB RAM
> 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated: 
> app-20140502215225-0005/0 is now RUNNING
> 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started
> 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1
> 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1
> 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms
> 14/05/02 21:52:26 INFO SocketInputDStream: Storage level = 
> StorageLevel(false, false, false, false, 1)
> 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null
> 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms
> 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.SocketInputDStream@5433868e
> 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms
> 14/05/02 21:52:26 INFO ForEachDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null
> 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms
> 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05
> 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at 
> ReceiverTracker.scala:270
> 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at time 
> 1399060346000
> 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at 1399060346000 ms
> 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler
> 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at 
> ReceiverTracker.scala:270)
> 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks
> 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at 
> ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false)
> 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at 
> ReceiverTracker.scala:270)
> 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1)
> 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms
> 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job 1399060346000 
> ms.0 from job set of time 1399060346000 ms
> 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time 
> 1399060346000 ms
> ---14/05/02 21:52:26 INFO 
> DStreamGraph: Updating checkpoint data for time 1399060346000 ms
> Time: 1399060346000 ms
> ---
> 14/05/02 21:52:26 INFO JobScheduler: Finished job streaming job 1399060346000 
> ms.0 from job set of time 1399060346000 ms
> 14/05/02 21:52:26 INFO JobScheduler: Total delay: 0.325 s for time 
> 1399060346000 ms (execution: 0.024 s)
> 14/05/02 21:52:42 INFO JobScheduler: Added jobs for time 1399060362000 ms
> 14/05/02 21:52:42 INFO JobGenerator: Checkpointing graph for time 
> 1399060362000 ms
> 14/05/02 21:52:42 INFO DStreamGraph: Updating checkpoint data for time 
> 1399060362000 ms
> 14/05/02 21:52:42 INFO DStreamGraph: Updated checkpoint data for time 
> 1399060362000 ms
> 14/05/02 21:52:42 INFO JobScheduler: Starting job streaming job 1399060362000 
> ms.0 from job set of time 1399060362000 ms
> 14/05/02 21:52:42 INFO SparkContext: Starting job: take at DStream.scala:593
> 14/05/02 21:52:42 INFO DAGScheduler: Got job 2 (take at DStream.scala:593) 
> with 1 output partitions (allowLocal=true)
> 14/05/02 21:52:42 INFO DAGScheduler: Final stage: Stage 3(take at 
> DStream.scala:593)
> 14/05/02 21:52:42 INFO DAGScheduler: Parents of final stage: List()

what's local[n]

2014-05-03 Thread Weide Zhang
in spark kafka example,

it says

   `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount
local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`

can any one tell me what does local[2] represent ? i thought master url
should be sth like spark://hostname:portname .

also, the thread count is specified as 1 in kafka example, what will happen
thread count goes to more than 1 ? does that mean multiple kafka consumer
will be created on different spark workers ? I'm not sure how does the code
mapped to realtime worker thread allocation ? Is there any documentation on
that ?


Re: Crazy Kryo Exception

2014-05-03 Thread Soren Macbeth
Poking around in the bowels of scala, it seems like this has something to
do with implicit scala -> java collection munging. Why would it be doing
this and where? The stack trace given is entirely unhelpful to me. Is there
a better one buried in my task logs? None of my tasks actually failed, so
it seems that it dying while trying to fetch results from my tasks to
return back to the driver.

Am I close?

On Fri, May 2, 2014 at 3:35 PM, Soren Macbeth  wrote:

> Hallo,
> I've getting this rather crazy kryo exception trying to run my spark job:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted:
> Exception while deserializing and fetching task:
> com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Can not set final
> scala.collection.convert.Wrappers field
> scala.collection.convert.Wrappers$SeqWrapper.$outer to my.custom.class
> Serialization trace:
> $outer (scala.collection.convert.Wrappers$SeqWrapper)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at
> at
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> at
> I have a kryo serializer for my.custom.class and I've registered it using
> a custom registrator on my context object. I've tested the custom
> serializer and the registrator locally and they both function as expected.
> This job is running spark 0.9.1 under mesos in fine grained mode.
> Please help!

Re: what's local[n]

2014-05-03 Thread Andrew Ash
Hi Weide,

The answer to your first question about local[2] can be found in the
"Running the Examples and Shell" section of

Note that all of the sample programs take a  parameter specifying
> the cluster URL to connect to. This can be a URL for a distributed 
> cluster,
> or local to run locally with one thread, or local[N] to run locally with
> N threads. You should start by using local for testing.

For the second, I'm not sure how the thread count affects the number of
kafka consumers.  My first guess would be that a thread is 1:1 with a kafka
consumer, but I'm not super familiar with Spark streaming.


On Sat, May 3, 2014 at 7:00 AM, Weide Zhang  wrote:

> in spark kafka example,
> it says
>`./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount
> local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
> can any one tell me what does local[2] represent ? i thought master url
> should be sth like spark://hostname:portname .
> also, the thread count is specified as 1 in kafka example, what will
> happen thread count goes to more than 1 ? does that mean multiple kafka
> consumer will be created on different spark workers ? I'm not sure how does
> the code mapped to realtime worker thread allocation ? Is there any
> documentation on that ?
> Weide

Lease Exception hadoop 2.4

2014-05-03 Thread Andre Kuhnen
Hello, I am getting this warning after upgrading Hadoop 2.4, when I try to
write something to the HDFS.   The content is written correctly, but I do
not like this warning.

DO I have to compile SPARK with hadoop 2.4?

WARN TaskSetManager: Loss was due to org.apache.hadoop.ipc.RemoteException


Spark-1.0.0-rc3 compiled against Hadoop 2.3.0 cannot read HDFS 2.3.0?

2014-05-03 Thread Nan Zhu
Hi, all 

I compiled the Spark-1.0.0-rc3 against Hadoop 2.3.0 with 

SPARK_HADOOP_VERSION=2.3.0 sbt/sbt assembly

and deploy the spark with HDFS 2.3.0 (yarn = false), it seems that it cannot 
read data from HDFS

the following exception is thrown

java.lang.VerifyError: class 
 overrides final method 
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(
at Method)
at java.lang.ClassLoader.loadClass(
at sun.misc.Launcher$AppClassLoader.loadClass(
at java.lang.ClassLoader.loadClass(
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(
at java.lang.Class.privateGetPublicMethods(
at java.lang.Class.privateGetPublicMethods(
at java.lang.Class.getMethods(
at sun.misc.ProxyGenerator.generateClassFile(
at sun.misc.ProxyGenerator.generateProxyClass(
at java.lang.reflect.Proxy.getProxyClass0(
at java.lang.reflect.Proxy.newProxyInstance(
at org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(
at org.apache.hadoop.ipc.RPC.getProtocolProxy(
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(
at org.apache.hadoop.hdfs.DFSClient.(
at org.apache.hadoop.hdfs.DFSClient.(
at org.apache.hadoop.fs.FileSystem.createFileSystem(
at org.apache.hadoop.fs.FileSystem.access$200(
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
at org.apache.hadoop.fs.FileSystem$Cache.get(
at org.apache.hadoop.fs.FileSystem.get(
at org.apache.hadoop.fs.Path.getFileSystem(
at org.apache.hadoop.mapred.FileInputFormat.listStatus(
at org.apache.hadoop.mapred.FileInputFormat.getSplits(
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1085)
at org.apache.spark.rdd.RDD.count(RDD.scala:836)
at $iwC$$iwC$$iwC$$iwC.(:15)
at $iwC$$iwC$$iwC.(:20)
at $iwC$$iwC.(:22)
at $iwC.(:24)
at (:26)
at .(:30)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at org.apache.spark.repl.SparkIMain$
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)

Re: "sbt/sbt run" command returns a JVM problem

2014-05-03 Thread Michael Armbrust
The problem is probably not with the JVM running sbt but with the one that
sbt is forking to run your program.

See here for the relevant option:

You might try starting sbt with no arguments (to bring up the sbt console).
 You can then set javaOptions += "-Xmx1G" and afterwards try run.


On Sat, May 3, 2014 at 5:15 AM, Carter  wrote:

> Hi, thanks for all your help.
> I tried your setting in the sbt file, but the problem is still there.
> The Java setting in my sbt file is:
> java \
>   -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
>   -jar ${JAR} \
>   "$@"
> I have tried to set these 3 parameters bigger and smaller, but nothing
> works. Did I change the right thing?
> Thank you very much.
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at

Re: Multiple Streams with Spark Streaming

2014-05-03 Thread Chris Fregly
if you want to use true Spark Streaming (not the same as Hadoop
Streaming/Piping, as Mayur pointed out), you can use the DStream.union()
method as described in the following docs:

our friend, diana carroll, from cloudera recently posted a nice little
utility for sending files to a Spark Streaming Receiver to simulate a
streaming scenario from disk.

here's the link to her post:


On Thu, May 1, 2014 at 3:09 AM, Mayur Rustagi wrote:

> File as a stream?
> I think you are confusing Spark Streaming with buffer reader. Spark
> streaming is meant to process batches of data (files, packets, messages) as
> they come in, infact utilizing time of packet reception as a way to create
> windows etc.
> In your case you are better off reading the file, partitioning it &
> operating on each column individually if that makes more sense to you.
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> @mayur_rustagi 
> On Thu, May 1, 2014 at 3:24 PM, Laeeq Ahmed  wrote:
>> Hi all,
>> Is it possible to read and process multiple streams with spark. I have
>> eeg(brain waves) csv file with 23 columns  Each column is one stream(wave)
>> and each column has one million values.
>> I know one way to do it is to take transpose of the file and then give it
>> to spark and each mapper will get one or more waves out of the 23 waves,
>> but then it will be non-streaming problem and I want to read the file as
>> stream. Please correct me if I am wrong.
>> I have to apply simple operations(mean and SD) on each window of a wave.
>> Regards,
>> Laeeq

Re: Reading multiple S3 objects, transforming, writing back one

2014-05-03 Thread Chris Fregly
not sure if this directly addresses your issue, peter, but it's worth
mentioned a handy AWS EMR utility called s3distcp that can upload a single
HDFS file - in parallel - to a single, concatenated S3 file once all the
partitions are uploaded.  kinda cool.

here's some info:

s3distcp is an extension of the familiar hadoop distcp, of course.

On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas <> wrote:

> The fastest way to save to S3 should be to leave the RDD with many
> partitions, because all partitions will be written out in parallel.
> Then, once the various parts are in S3, somehow concatenate the files
> together into one file.
> If this can be done within S3 (I don't know if this is possible), then you
> get the best of both worlds: a highly parallelized write to S3, and a
> single cleanly named output file.
> On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:
>> Thank you Patrick.
>> I took a quick stab at it:
>> val s3Client = new AmazonS3Client(...)
>> val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>> "/part-0", "rolled-up-logs", "2014-04-28.csv")
>> val objectListing = s3Client.listObjects("upload", outputPrefix)
>> s3Client.deleteObjects(new
>> DeleteObjectsRequest("upload").withKeys(
>> => new KeyVersion(s.getKey)).asJava))
>>  Using a 3GB object I achieved about 33MB/s between buckets in the same
>> AZ.
>> This is a workable solution for the short term but not ideal for the
>> longer term as data size increases. I understand it's a limitation of the
>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>> object :)
>>   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell <
>>> wrote:
>>  This is a consequence of the way the Hadoop files API works. However,
>> you can (fairly easily) add code to just rename the file because it
>> will always produce the same filename.
>> (heavy use of pseudo code)
>> dir = "/some/dir"
>> rdd.coalesce(1).saveAsTextFile(dir)
>> f = new File(dir + "part-0")
>> f.moveTo("somewhere else")
>> dir.remove()
>> It might be cool to add a utility called `saveAsSingleFile` or
>> something that does this for you. In fact probably we should have
>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>> On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>> roll
>> > up for example when every output needs to be in it's own "directory".
>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>> >  wrote:
>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>> > coalesce(1), you move everything in the RDD to a single partition, which
>> > then gives you 1 output file.
>> > It will still be called part-0 or something like that because that's
>> > defined by the Hadoop API that Spark uses for reading to/writing from
>> S3. I
>> > don't know of a way to change that.
>> >
>> >
>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>> >
>> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
>> > On Wednesday, April 30, 2014 11:15 AM, Peter 
>> > wrote:
>> > Hi
>> >
>> > Playing around with Spark & S3, I'm opening multiple objects (CSV files)
>> > with:
>> >
>> >val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>> >
>> > so hfile is a RDD representing 10 objects that were "underneath"
>> 2014-04-28.
>> > After I've sorted and otherwise transformed the content, I'm trying to
>> write
>> > it back to a single object:
>> >
>> >
>> >
>> >
>> > unfortunately this results in a "folder" named concatted.csv with 10
>> objects
>> > underneath, part-0 .. part-00010, corresponding to the 10 original
>> > objects loaded.
>> >
>> > How can I achieve the desired behaviour of putting a single object named
>> > concatted.csv ?
>> >
>> > I've tried 0.9.1 and 1.0.0-RC3.
>> >
>> > Thanks!
>> > Peter
>> >
>> >
>> >
>> >
>> >
>> >
>> >

Re: Reading multiple S3 objects, transforming, writing back one

2014-05-03 Thread Patrick Wendell
Hi Peter,

If your dataset is large (3GB) then why not just chunk it into
multiple files? You'll need to do this anyways if you want to
read/write this from S3 quickly, because S3's throughput per file is
limited (as you've seen).

This is exactly why the Hadoop API lets you save datasets with many
partitions, since often there are bottlenecks at the granularity of a

Is there a reason you need this to be exactly one file?

- Patrick

On Sat, May 3, 2014 at 4:14 PM, Chris Fregly  wrote:
> not sure if this directly addresses your issue, peter, but it's worth
> mentioned a handy AWS EMR utility called s3distcp that can upload a single
> HDFS file - in parallel - to a single, concatenated S3 file once all the
> partitions are uploaded.  kinda cool.
> here's some info:
> s3distcp is an extension of the familiar hadoop distcp, of course.
> On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas
>  wrote:
>> The fastest way to save to S3 should be to leave the RDD with many
>> partitions, because all partitions will be written out in parallel.
>> Then, once the various parts are in S3, somehow concatenate the files
>> together into one file.
>> If this can be done within S3 (I don't know if this is possible), then you
>> get the best of both worlds: a highly parallelized write to S3, and a single
>> cleanly named output file.
>> On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:
>>> Thank you Patrick.
>>> I took a quick stab at it:
>>> val s3Client = new AmazonS3Client(...)
>>> val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>>> "/part-0", "rolled-up-logs", "2014-04-28.csv")
>>> val objectListing = s3Client.listObjects("upload", outputPrefix)
>>> s3Client.deleteObjects(new
>>> DeleteObjectsRequest("upload").withKeys(
>>> => new KeyVersion(s.getKey)).asJava))
>>> Using a 3GB object I achieved about 33MB/s between buckets in the same
>>> AZ.
>>> This is a workable solution for the short term but not ideal for the
>>> longer term as data size increases. I understand it's a limitation of the
>>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>>> object :)
>>> On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell
>>>  wrote:
>>> This is a consequence of the way the Hadoop files API works. However,
>>> you can (fairly easily) add code to just rename the file because it
>>> will always produce the same filename.
>>> (heavy use of pseudo code)
>>> dir = "/some/dir"
>>> rdd.coalesce(1).saveAsTextFile(dir)
>>> f = new File(dir + "part-0")
>>> f.moveTo("somewhere else")
>>> dir.remove()
>>> It might be cool to add a utility called `saveAsSingleFile` or
>>> something that does this for you. In fact probably we should have
>>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>> On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
>>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>>> > roll
>>> > up for example when every output needs to be in it's own "directory".
>>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>>> >  wrote:
>>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>>> > coalesce(1), you move everything in the RDD to a single partition,
>>> > which
>>> > then gives you 1 output file.
>>> > It will still be called part-0 or something like that because
>>> > that's
>>> > defined by the Hadoop API that Spark uses for reading to/writing from
>>> > S3. I
>>> > don't know of a way to change that.
>>> >
>>> >
>>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>>> >
>>> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
>>> > On Wednesday, April 30, 2014 11:15 AM, Peter 
>>> > wrote:
>>> > Hi
>>> >
>>> > Playing around with Spark & S3, I'm opening multiple objects (CSV
>>> > files)
>>> > with:
>>> >
>>> >val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>>> >
>>> > so hfile is a RDD representing 10 objects that were "underneath"
>>> > 2014-04-28.
>>> > After I've sorted and otherwise transformed the content, I'm trying to
>>> > write
>>> > it back to a single object:
>>> >
>>> >
>>> >
>>> >",")).saveAsTextFile("s3n://bucket/concatted.csv")
>>> >
>>> > unfortunately this results in a "folder" named concatted.csv with 10
>>> > objects
>>> > underneath, part-0 .. part-00010, corresponding to the 10 original
>>> > objects loaded.
>>> >
>>> > How can I achieve the desired behaviour of putting a single object
>>> > named
>>> > concatted.csv ?
>>> >
>>> > I've tried 0.9.1 and 1.0.0-RC3.
>>> >
>>> > Thanks!
>>> > Peter
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >

Re: Setting the Scala version in the EC2 script?

2014-05-03 Thread Patrick Wendell
Spark will only work with Scala 2.10... are you trying to do a minor
version upgrade or upgrade to a totally different version?

You could do this as follows if you want:
1. Fork the spark-ec2 repository and change the file here:
2. Modify your script to checkout spark-ec2 from forked version.

- Patrick

On Thu, May 1, 2014 at 2:14 PM, Ian Ferreira  wrote:
> Is this possible, it is very annoying to have such a great script, but still
> have to manually update stuff afterwards.

Re: when to use broadcast variables

2014-05-03 Thread Patrick Wendell
Broadcast variables need to fit entirely in memory - so that's a
pretty good litmus test for whether or not to broadcast a smaller
dataset or turn it into an RDD.

On Fri, May 2, 2014 at 7:50 AM, Prashant Sharma  wrote:
> I had like to be corrected on this but I am just trying to say small enough
> of the order of few 100 MBs. Imagine the size gets shipped to all nodes, it
> can be a GB but not GBs and then depends on the network too.
> Prashant Sharma
> On Fri, May 2, 2014 at 6:42 PM, Diana Carroll  wrote:
>> Anyone have any guidance on using a broadcast variable to ship data to
>> workers vs. an RDD?
>> Like, say I'm joining web logs in an RDD with user account data.  I could
>> keep the account data in an RDD or if it's "small", a broadcast variable
>> instead.  How small is small?  Small enough that I know it can easily fit in
>> memory on a single node?  Some other guideline?
>> Thanks!
>> Diana

Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Hi Diana,

Apart from these reasons, in a multi-stage job, Spark saves the map output 
files from map stages to the filesystem, so it only needs to rerun the last 
reduce stage. This is why you only saw one stage executing. These files are 
saved for fault recovery but they speed up subsequent runs.


On May 3, 2014, at 5:21 PM, Patrick Wendell  wrote:

> Ethan,
> What you said is actually not true, Spark won't cache RDD's unless you ask it 
> to.
> The observation here - that running the same job can speed up substantially 
> even without caching - is common. This is because other components in the 
> stack are performing caching and optimizations. Two that can make a huge 
> difference are:
> 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
> 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
> significantly speed up execution speed.
> These can make a huge difference if you are running the same job 
> over-and-over. And there are other things like the OS network stack 
> increasing TCP windows and so fourth. These will all improve response time as 
> a spark program executes.
> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett  wrote:
> I believe Spark caches RDDs it has memory for regardless of whether you 
> actually call the 'cache' method on the RDD. The 'cache' method just tips off 
> Spark that the RDD should have higher priority. At least, that is my 
> experience and it seems to correspond with your experience and with my 
> recollection of other discussions on this topic on the list. However, going 
> back and looking at the programming guide, this is not the way the 
> cache/persist behavior is described. Does the guide need to be updated?
> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll  wrote:
> I'm just Posty McPostalot this week, sorry folks! :-)
> Anyway, another question today:
> I have a bit of code that is pretty time consuming (pasted at the end of the 
> message):
> It reads in a bunch of XML files, parses them, extracts some data in a map, 
> counts (using reduce), and then sorts.   All stages are executed when I do a 
> final operation (take).  The first stage is the most expensive: on first run 
> it takes 30s to a minute.
> I'm not caching anything.
> When I re-execute that take at the end, I expected it to re-execute all the 
> same stages, and take approximately the same amount of time, but it didn't.  
> The second "take" executes only a single stage which collectively run very 
> fast: the whole operation takes less than 1 second (down from 5 minutes!)
> While this is awesome (!) I don't understand it.  If I'm not caching data, 
> why would I see such a marked performance improvement on subsequent execution?
> (or is this related to the known .9.1 bug about sortByKey executing an action 
> when it shouldn't?)
> Thanks,
> Diana
> # load XML files containing device activation records.
> # Find the most common device models activated
> import xml.etree.ElementTree as ElementTree
> # Given a partition containing multi-line XML, parse the contents. 
> # Return an iterator of activation Elements contained in the partition
> def getactivations(fileiterator):
> s = ''
> for i in fileiterator: s = s + str(i)
> filetree = ElementTree.fromstring(s)
> return filetree.getiterator('activation')
> # Get the model name from a device activation record
> def getmodel(activation):
> return activation.find('model').text 
> filename="hdfs://localhost/user/training/activations/*.xml"
> # parse each partition as a file into an activation XML record
> activations = sc.textFile(filename)
> activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
> models = activation: getmodel(activation))
> # count and sort activations by model
> topmodels = model: (model,1))\
> .reduceByKey(lambda v1,v2: v1+v2)\
> .map(lambda (model,count): (count,model))\
> .sortByKey(ascending=False)
> # display the top 10 models
> for (count,model) in topmodels.take(10):
> print "Model %s (%s)" % (model,count)
> # repeat!
> for (count,model) in topmodels.take(10):
> print "Model %s (%s)" % (model,count)

Re: Spark's behavior

2014-05-03 Thread Andrew Ash
Hi Eduardo,

Yep those machines look pretty well synchronized at this point.  Just
wanted to throw that out there and eliminate it as a possible source of

Good luck on continuing the debugging!

On Sat, May 3, 2014 at 11:59 AM, Eduardo Costa Alfaia <> wrote:

> Hi TD,
> I did a test with 8 workers and 1 word source, the time gap was 27 sec,
> how can see in the log files(in attach).
> Hi Andrew,
> I configured the ntp, all machines are synchronized.
> root@computer8:/opt/unibs_test/spark-1.0RC3# for num in
> {1,8,10,11,13,15,16,18,22}; do ssh computer$num date; done
> Sat May  3 20:57:41 CEST 2014
> Sat May  3 20:57:41 CEST 2014
> Sat May  3 20:57:41 CEST 2014
> Sat May  3 20:57:42 CEST 2014
> Sat May  3 20:57:42 CEST 2014
> Sat May  3 20:57:42 CEST 2014
> Sat May  3 20:57:42 CEST 2014
> Sat May  3 20:57:42 CEST 2014
> Sat May  3 20:57:42 CEST 2014
> Informativa sulla Privacy:
> On May 3, 2014, at 15:46, Eduardo Costa Alfaia 
> wrote:
> Hi TD, Thanks for reply
> This last experiment I did with one computer, like local, but I think that
> time gap grow up when I add more computer. I will do again now with 8
> worker and 1 word source and I will see what’s go on. I will control the
> time too, like suggested by Andrew.
> On May 3, 2014, at 1:19, Tathagata Das 
> wrote:
> From the logs, I see that the print() starts printing stuff 10 seconds
> after the context is started. And that 10 seconds is taken by the initial
> empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure
> all the executors have started. Somehow the first empty task takes 7-8
> seconds to complete. See if this can be reproduced by running a simple,
> empty job in spark shell (in the same cluster) and see if the first task
> takes 7-8 seconds.
> Either way, I didnt see the 30 second gap, but a 10 second gap. And that
> does not seem to be a persistent problem as after that 10 seconds, the data
> is being received and processed.
> TD
> On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia <
>> wrote:
>> Hi TD,
>> I got the another information today using Spark 1.0 RC3 and the situation
>> remain the same:
>> The lines begin after 17 sec:
>> 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID
>> app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores,
>> 2.0 GB RAM
>> 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated:
>> app-20140502215225-0005/0 is now RUNNING
>> 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started
>> 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1
>> 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1
>> 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms
>> 14/05/02 21:52:26 INFO SocketInputDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null
>> 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms
>> 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.SocketInputDStream@5433868e
>> 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms
>> 14/05/02 21:52:26 INFO ForEachDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null
>> 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms
>> 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05
>> 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at
>> ReceiverTracker.scala:270
>> 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at
>> time 1399060346000
>> 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at
>> 1399060346000 ms
>> 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler
>> 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at
>> ReceiverTracker.scala:270)
>> 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks
>> 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at
>> ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false)
>> 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at
>> ReceiverTracker.scala:270)
>> 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1)
>> 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms
>> 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job
>> 1399060346000 ms.0 from job set of time 1399060346000 ms
>> 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time
>> 1399060346000 ms
>> ---14/05/02 21:52:26 INFO
>> DStreamGraph: Updating checkpoint data for time 1399060346000 ms
>> Time: 1399060346000 ms
>> -

Re: help me

2014-05-03 Thread Chris Fregly
as Mayur indicated, it's odd that you are seeing better performance from a
less-local configuration.  however, the non-deterministic behavior that you
describe is likely caused by GC pauses in your JVM process.

take note of the *spark.locality.wait* configuration parameter described

this is the amount of time the Spark execution engine waits before
launching a new task on a less-data-local node (ie. process -> node ->
rack).  by default, this is 3 seconds.

if there is excessive GC occurring on the original process-local JVM, it is
possible that another node-local JVM process could actually load the data
from HDFS (on the same node) and complete the processing before the
original process's GC finishes.

you could bump up the *spark.locality.wait* default (not recommended) or
increase your number of nodes/partitions to increase parallelism and reduce

also, keep an eye on your GC characteristics.  perhaps you need to increase
your Eden size to reduce the amount of movement through the GC generations
and reduce major compactions.  (the usual GC tuning fun.)

curious if others have experienced this behavior, as well?


On Fri, May 2, 2014 at 6:07 AM, Mayur Rustagi wrote:

> Spark would be much faster on process_local instead of node_local.
> Node_local references data from local harddisk, process_local references
> data from in-memory thread.
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> @mayur_rustagi 
> On Tue, Apr 22, 2014 at 4:45 PM, Joe L  wrote:
>> I got the following performance is it normal in spark to be like this.
>> some
>> times spark switchs into node_local mode from process_local and it becomes
>> 10x faster. I am very confused.
>> scala> val a = sc.textFile("/user/exobrain/batselem/LUBM1000")
>> scala> f.count()
>> Long = 137805557
>> took 130.809661618 s
>> --
>> View this message in context:
>> Sent from the Apache Spark User List mailing list archive at

Re: Equally weighted partitions in Spark

2014-05-03 Thread Chris Fregly
@deenar-  i like the custom partitioner strategy that you mentioned.  i
think it's very useful.

as a thought-exercise, is it possible to re-partition your RDD to
more-evenly distribute the long-running tasks among the short-running tasks
by ordering the key's differently?  this would play nice with the existing

or perhaps manipulate the key's hashCode() to more-evenly-distribute the
tasks to play nicely with the existing HashPartitioner?

i don't know if either of these are beneficial, but throwing them out for
the sake of conversation...


On Fri, May 2, 2014 at 11:10 AM, Andrew Ash  wrote:

> Deenar,
> I haven't heard of any activity to do partitioning in that way, but it
> does seem more broadly valuable.
> On Fri, May 2, 2014 at 10:15 AM, deenar.toraskar 
> wrote:
>> I have equal sized partitions now, but I want the RDD to be partitioned
>> such
>> that the partitions are equally weighted by some attribute of each RDD
>> element (e.g. size or complexity).
>> I have been looking at the RangePartitioner code and I have come up with
>> something like
>> EquallyWeightedPartitioner(noOfPartitions, weightFunction)
>> 1) take a sum or (sample) of complexities of all elements and calculate
>> average weight per partition
>> 2) take a histogram of weights
>> 3) assign a list of partitions to each bucket
>> 4)  getPartition(key: Any): Int would
>>   a) get the weight and then find the bucket
>>   b) assign a random partition from the list of partitions associated with
>> each bucket
>> Just wanted to know if someone else had come across this issue before and
>> there was a better way of doing this.
>> --
>> View this message in context:
>> Sent from the Apache Spark User List mailing list archive at

spark run issue

2014-05-03 Thread Weide Zhang
Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
encountered some exception when initialize kafka consumer/producer config.
I'm using scala 2.10.3 and used maven build inside spark streaming kafka
library comes with spark2.9.1. Any one see this exception before?


 [java] The args attribute is deprecated. Please use nested arg
 [java] Exception in thread "main" java.lang.NoClassDefFoundError:
 [java] at
 [java] at
 [java] at
 [java] Caused by: java.lang.ClassNotFoundException:
 [java] at$
 [java] at
 [java] at
 [java] at java.lang.ClassLoader.loadClass(
 [java] at
 [java] at java.lang.ClassLoader.loadClass(
 [java] ... 3 more
 [java] Java Result: 1

Re: spark run issue

2014-05-03 Thread Tathagata Das
I am a little confused about the version of Spark you are using. Are you
using Spark 0.9.1 that uses scala 2.10.3 ?


On Sat, May 3, 2014 at 6:16 PM, Weide Zhang  wrote:

> Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
> encountered some exception when initialize kafka consumer/producer config.
> I'm using scala 2.10.3 and used maven build inside spark streaming kafka
> library comes with spark2.9.1. Any one see this exception before?
> Thanks,
> producer:
>  [java] The args attribute is deprecated. Please use nested arg
> elements.
>  [java] Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/Tuple2$mcLL$sp
>  [java] at
> kafka.producer.ProducerConfig.(ProducerConfig.scala:56)
>  [java] at
> com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89)
>  [java] at
> com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala)
>  [java] Caused by: java.lang.ClassNotFoundException:
> scala.Tuple2$mcLL$sp
>  [java] at$
>  [java] at
> Method)
>  [java] at
>  [java] at java.lang.ClassLoader.loadClass(
>  [java] at
> sun.misc.Launcher$AppClassLoader.loadClass(
>  [java] at java.lang.ClassLoader.loadClass(
>  [java] ... 3 more
>  [java] Java Result: 1

Re: spark run issue

2014-05-03 Thread Weide Zhang
Hi Tathagata,

I figured out the reason. I was adding a wrong kafka lib along side with
the version spark uses. Sorry for spamming.


On Sat, May 3, 2014 at 7:04 PM, Tathagata Das

> I am a little confused about the version of Spark you are using. Are you
> using Spark 0.9.1 that uses scala 2.10.3 ?
> TD
> On Sat, May 3, 2014 at 6:16 PM, Weide Zhang  wrote:
>> Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
>> encountered some exception when initialize kafka consumer/producer config.
>> I'm using scala 2.10.3 and used maven build inside spark streaming kafka
>> library comes with spark2.9.1. Any one see this exception before?
>> Thanks,
>> producer:
>>  [java] The args attribute is deprecated. Please use nested arg
>> elements.
>>  [java] Exception in thread "main" java.lang.NoClassDefFoundError:
>> scala/Tuple2$mcLL$sp
>>  [java] at
>> kafka.producer.ProducerConfig.(ProducerConfig.scala:56)
>>  [java] at
>> com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89)
>>  [java] at
>> com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala)
>>  [java] Caused by: java.lang.ClassNotFoundException:
>> scala.Tuple2$mcLL$sp
>>  [java] at$
>>  [java] at
>> Method)
>>  [java] at
>>  [java] at java.lang.ClassLoader.loadClass(
>>  [java] at
>> sun.misc.Launcher$AppClassLoader.loadClass(
>>  [java] at java.lang.ClassLoader.loadClass(
>>  [java] ... 3 more
>>  [java] Java Result: 1

Re: spark run issue

2014-05-03 Thread Weide Zhang
Hi Tathagata,

I actually have a separate question. What's the usage of lib_managed folder
inside spark source folder ? Are those the library required for spark
streaming to run ? Do they needed to be added to spark classpath when
starting sparking cluster?


On Sat, May 3, 2014 at 7:08 PM, Weide Zhang  wrote:

> Hi Tathagata,
> I figured out the reason. I was adding a wrong kafka lib along side with
> the version spark uses. Sorry for spamming.
> Weide
> On Sat, May 3, 2014 at 7:04 PM, Tathagata Das  > wrote:
>> I am a little confused about the version of Spark you are using. Are you
>> using Spark 0.9.1 that uses scala 2.10.3 ?
>> TD
>> On Sat, May 3, 2014 at 6:16 PM, Weide Zhang  wrote:
>>> Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
>>> encountered some exception when initialize kafka consumer/producer config.
>>> I'm using scala 2.10.3 and used maven build inside spark streaming kafka
>>> library comes with spark2.9.1. Any one see this exception before?
>>> Thanks,
>>> producer:
>>>  [java] The args attribute is deprecated. Please use nested arg
>>> elements.
>>>  [java] Exception in thread "main" java.lang.NoClassDefFoundError:
>>> scala/Tuple2$mcLL$sp
>>>  [java] at
>>> kafka.producer.ProducerConfig.(ProducerConfig.scala:56)
>>>  [java] at
>>> com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89)
>>>  [java] at
>>> com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala)
>>>  [java] Caused by: java.lang.ClassNotFoundException:
>>> scala.Tuple2$mcLL$sp
>>>  [java] at$
>>>  [java] at
>>> Method)
>>>  [java] at
>>>  [java] at java.lang.ClassLoader.loadClass(
>>>  [java] at
>>> sun.misc.Launcher$AppClassLoader.loadClass(
>>>  [java] at java.lang.ClassLoader.loadClass(
>>>  [java] ... 3 more
>>>  [java] Java Result: 1

cache not work as expected for iteration?

2014-05-03 Thread Earthson
I'm using spark for LDA impementation. I need cache RDD for next step of
Gibbs Sampling, and cached the result and the cache previous could be
uncache. Something like LRU cache should delete the previous cache because
it is never used then, but the cache runs into confusion:

Here is the code:)




View this message in context:
Sent from the Apache Spark User List mailing list archive at

Re: performance improvement on second operation...without caching?

2014-05-03 Thread Koert Kuipers
Hey Matei,
Not sure i understand that. These are 2 separate jobs. So the second job
takes advantage of the fact that there is map output left somewhere on disk
from the first job, and re-uses that?

On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia wrote:

> Hi Diana,
> Apart from these reasons, in a multi-stage job, Spark saves the map output
> files from map stages to the filesystem, so it only needs to rerun the last
> reduce stage. This is why you only saw one stage executing. These files are
> saved for fault recovery but they speed up subsequent runs.
> Matei
> On May 3, 2014, at 5:21 PM, Patrick Wendell  wrote:
> Ethan,
> What you said is actually not true, Spark won't cache RDD's unless you ask
> it to.
> The observation here - that running the same job can speed up
> substantially even without caching - is common. This is because other
> components in the stack are performing caching and optimizations. Two that
> can make a huge difference are:
> 1. The OS buffer cache. Which will keep recently read disk blocks in
> memory.
> 2. The Java just-in-time compiler (JIT) which will use runtime profiling
> to significantly speed up execution speed.
> These can make a huge difference if you are running the same job
> over-and-over. And there are other things like the OS network stack
> increasing TCP windows and so fourth. These will all improve response time
> as a spark program executes.
> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett  wrote:
>> I believe Spark caches RDDs it has memory for regardless of whether you
>> actually call the 'cache' method on the RDD. The 'cache' method just tips
>> off Spark that the RDD should have higher priority. At least, that is my
>> experience and it seems to correspond with your experience and with my
>> recollection of other discussions on this topic on the list. However, going
>> back and looking at the programming guide, this is not the way the
>> cache/persist behavior is described. Does the guide need to be updated?
>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll wrote:
>>> I'm just Posty McPostalot this week, sorry folks! :-)
>>> Anyway, another question today:
>>> I have a bit of code that is pretty time consuming (pasted at the end of
>>> the message):
>>> It reads in a bunch of XML files, parses them, extracts some data in a
>>> map, counts (using reduce), and then sorts.   All stages are executed when
>>> I do a final operation (take).  The first stage is the most expensive: on
>>> first run it takes 30s to a minute.
>>> I'm not caching anything.
>>> When I re-execute that take at the end, I expected it to re-execute all
>>> the same stages, and take approximately the same amount of time, but it
>>> didn't.  The second "take" executes only a single stage which collectively
>>> run very fast: the whole operation takes less than 1 second (down from 5
>>> minutes!)
>>> While this is awesome (!) I don't understand it.  If I'm not caching
>>> data, why would I see such a marked performance improvement on subsequent
>>> execution?
>>> (or is this related to the known .9.1 bug about sortByKey executing an
>>> action when it shouldn't?)
>>> Thanks,
>>> Diana
>>> # load XML files containing device activation records.
>>> # Find the most common device models activated
>>> import xml.etree.ElementTree as ElementTree
>>> # Given a partition containing multi-line XML, parse the contents.
>>> # Return an iterator of activation Elements contained in the partition
>>> def getactivations(fileiterator):
>>> s = ''
>>> for i in fileiterator: s = s + str(i)
>>> filetree = ElementTree.fromstring(s)
>>> return filetree.getiterator('activation')
>>> # Get the model name from a device activation record
>>> def getmodel(activation):
>>> return activation.find('model').text
>>> filename="hdfs://localhost/user/training/activations/*.xml"
>>> # parse each partition as a file into an activation XML record
>>> activations = sc.textFile(filename)
>>> activationTrees = activations.mapPartitions(lambda xml:
>>> getactivations(xml))
>>> models = activation: getmodel(activation))
>>> # count and sort activations by model
>>> topmodels = model: (model,1))\
>>> .reduceByKey(lambda v1,v2: v1+v2)\
>>> .map(lambda (model,count): (count,model))\
>>> .sortByKey(ascending=False)
>>> # display the top 10 models
>>> for (count,model) in topmodels.take(10):
>>> print "Model %s (%s)" % (model,count)
>>>  # repeat!
>>> for (count,model) in topmodels.take(10):
>>> print "Model %s (%s)" % (model,count)

Re: "sbt/sbt run" command returns a JVM problem

2014-05-03 Thread Carter
Hi Michael,

Thank you very much for your reply.

Sorry I am not very familiar with sbt. Could you tell me where to set the
Java option for the sbt fork for my program? I brought up the sbt console,
and run "set javaOptions += "-Xmx1G"" in it, but it returned an error: 
[error] object scala not found.
[error] Use 'last' for the full log.

Is this the right way to set the java option? Thank you very much.

View this message in context:
Sent from the Apache Spark User List mailing list archive at

Re: "sbt/sbt run" command returns a JVM problem

2014-05-03 Thread Michael Armbrust
Can you type last and print the full log?  Also include your sbt

On Sat, May 3, 2014 at 9:02 PM, Carter  wrote:

> Hi Michael,
> Thank you very much for your reply.
> Sorry I am not very familiar with sbt. Could you tell me where to set the
> Java option for the sbt fork for my program? I brought up the sbt console,
> and run "set javaOptions += "-Xmx1G"" in it, but it returned an error:
> [error] object scala not found.
> [error] Use 'last' for the full log.
> Is this the right way to set the java option? Thank you very much.
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at

Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Yes, this happens as long as you use the same RDD. For example say you do the 

data1 = sc.textFile(…).map(…).reduceByKey(…)

The first count() causes outputs of the map/reduce pair in there to be written 
out to shuffle files. Next time you do a count, on either this RDD or a child 
(e.g. after the filter), we notice that output files were already generated for 
this shuffle so we don’t rerun the map stage. Note that the output does get 
read again over the network, which is kind of wasteful (if you really wanted to 
reuse this as quickly as possible you’d use cache()).


On May 3, 2014, at 8:44 PM, Koert Kuipers  wrote:

> Hey Matei,
> Not sure i understand that. These are 2 separate jobs. So the second job 
> takes advantage of the fact that there is map output left somewhere on disk 
> from the first job, and re-uses that?
> On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia  wrote:
> Hi Diana,
> Apart from these reasons, in a multi-stage job, Spark saves the map output 
> files from map stages to the filesystem, so it only needs to rerun the last 
> reduce stage. This is why you only saw one stage executing. These files are 
> saved for fault recovery but they speed up subsequent runs.
> Matei
> On May 3, 2014, at 5:21 PM, Patrick Wendell  wrote:
>> Ethan,
>> What you said is actually not true, Spark won't cache RDD's unless you ask 
>> it to.
>> The observation here - that running the same job can speed up substantially 
>> even without caching - is common. This is because other components in the 
>> stack are performing caching and optimizations. Two that can make a huge 
>> difference are:
>> 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
>> 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
>> significantly speed up execution speed.
>> These can make a huge difference if you are running the same job 
>> over-and-over. And there are other things like the OS network stack 
>> increasing TCP windows and so fourth. These will all improve response time 
>> as a spark program executes.
>> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett  wrote:
>> I believe Spark caches RDDs it has memory for regardless of whether you 
>> actually call the 'cache' method on the RDD. The 'cache' method just tips 
>> off Spark that the RDD should have higher priority. At least, that is my 
>> experience and it seems to correspond with your experience and with my 
>> recollection of other discussions on this topic on the list. However, going 
>> back and looking at the programming guide, this is not the way the 
>> cache/persist behavior is described. Does the guide need to be updated?
>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll  wrote:
>> I'm just Posty McPostalot this week, sorry folks! :-)
>> Anyway, another question today:
>> I have a bit of code that is pretty time consuming (pasted at the end of the 
>> message):
>> It reads in a bunch of XML files, parses them, extracts some data in a map, 
>> counts (using reduce), and then sorts.   All stages are executed when I do a 
>> final operation (take).  The first stage is the most expensive: on first run 
>> it takes 30s to a minute.
>> I'm not caching anything.
>> When I re-execute that take at the end, I expected it to re-execute all the 
>> same stages, and take approximately the same amount of time, but it didn't.  
>> The second "take" executes only a single stage which collectively run very 
>> fast: the whole operation takes less than 1 second (down from 5 minutes!)
>> While this is awesome (!) I don't understand it.  If I'm not caching data, 
>> why would I see such a marked performance improvement on subsequent 
>> execution?
>> (or is this related to the known .9.1 bug about sortByKey executing an 
>> action when it shouldn't?)
>> Thanks,
>> Diana
>> # load XML files containing device activation records.
>> # Find the most common device models activated
>> import xml.etree.ElementTree as ElementTree
>> # Given a partition containing multi-line XML, parse the contents. 
>> # Return an iterator of activation Elements contained in the partition
>> def getactivations(fileiterator):
>> s = ''
>> for i in fileiterator: s = s + str(i)
>> filetree = ElementTree.fromstring(s)
>> return filetree.getiterator('activation')
>> # Get the model name from a device activation record
>> def getmodel(activation):
>> return activation.find('model').text 
>> filename="hdfs://localhost/user/training/activations/*.xml"
>> # parse each partition as a file into an activation XML record
>> activations = sc.textFile(filename)
>> activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
>> models = activation: getmodel(activation))
>> # count and sort activations by model
>> topmodels = model: (mod

Re: spark run issue

2014-05-03 Thread Tathagata Das
All the stuff in lib_managed are what gets downloaded by sbt/maven when you
compile. Those are necessary for running spark, spark streaming, etc. But
you should not have to add all that to classpath individually and manually
when running Spark programs. If you are trying to run your Spark program
locally, you should use sbt or maven to compile your project with Spark as
a dependency, and sbt/maven will take care of putting all the necessary
jars in the classpath (when you run run your program with sbt/maven). If
you are trying to run your Spark program on a cluster, then refer to
the deployment
guide  . To run
a Spark stand alone cluster, you just have to compile spark and place the
whole spark directory on the worker nodes. For other deploy modes like Yarn
and Mesos, you should just compile spark into a big all-inclusive jar and
supply that when you launch your program on Yarn/mesos. See the guide for
more details.


On Sat, May 3, 2014 at 7:24 PM, Weide Zhang  wrote:

> Hi Tathagata,
> I actually have a separate question. What's the usage of lib_managed
> folder inside spark source folder ? Are those the library required for
> spark streaming to run ? Do they needed to be added to spark classpath when
> starting sparking cluster?
> Weide
> On Sat, May 3, 2014 at 7:08 PM, Weide Zhang  wrote:
>> Hi Tathagata,
>> I figured out the reason. I was adding a wrong kafka lib along side with
>> the version spark uses. Sorry for spamming.
>> Weide
>> On Sat, May 3, 2014 at 7:04 PM, Tathagata Das <
>>> wrote:
>>> I am a little confused about the version of Spark you are using. Are you
>>> using Spark 0.9.1 that uses scala 2.10.3 ?
>>> TD
>>> On Sat, May 3, 2014 at 6:16 PM, Weide Zhang  wrote:
 Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
 encountered some exception when initialize kafka consumer/producer config.
 I'm using scala 2.10.3 and used maven build inside spark streaming kafka
 library comes with spark2.9.1. Any one see this exception before?


  [java] The args attribute is deprecated. Please use nested arg
  [java] Exception in thread "main" java.lang.NoClassDefFoundError:
  [java] at
  [java] at
  [java] at
  [java] Caused by: java.lang.ClassNotFoundException:
  [java] at$
  [java] at
  [java] at
  [java] at java.lang.ClassLoader.loadClass(
  [java] at
  [java] at java.lang.ClassLoader.loadClass(
  [java] ... 3 more
  [java] Java Result: 1


Re: Reading and processing binary format files using spark

2014-05-03 Thread Mayur Rustagi
Hadoop Input & Output format would be the best way.

Mayur Rustagi
Ph: +1 (760) 203 3257

On Sat, May 3, 2014 at 10:12 AM, Chengi Liu  wrote:

> Hi,
>Lets say I have millions of binary format files... Lets say I have this
> java (or python) library which reads and parses these binary formatted
> files..
> Say
> import foo
> f =
> header = f.get_header()
> and some other methods..
> What I was thinking was to write hadoop input format to read and parse
> these files.. but since I am newbie in spark.. I was wondering if I can
> directly open these files and use these existing libraries to process the
> files..I dont necessary have requirement to save these files in hdfs.. even
> nfs will work.
> Is there a way I can use spark without having to write the parser again?
> Thanks