Re: return probability \ confidence instead of actual class

2014-09-21 Thread Liquan Pei
HI Adamantios,

For your first question, after you train the SVM, you get a model with a
vector of weights w and an intercept b, point x such that  w.dot(x) + b = 1
and w.dot(x) + b = -1 are points that on the decision boundary. The
quantity w.dot(x) + b for point x is a confidence measure of
classification.

Code wise, suppose you trained your model via
val model = SVMWithSGD.train(...)

and you can set a threshold by calling

model.setThreshold(your threshold here)

to set the threshold that separate positive predictions from negative
predictions.

For more info, please take a look at
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.SVMModel

For your second question, SVMWithSGD only supports binary classification.

Hope this helps,

Liquan

On Sun, Sep 21, 2014 at 11:22 PM, Adamantios Corais <
adamantios.cor...@gmail.com> wrote:

> Nobody?
>
> If that's not supported already, can please, at least, give me a few hints
> on how to implement it?
>
> Thanks!
>
>
> On Fri, Sep 19, 2014 at 7:43 PM, Adamantios Corais <
> adamantios.cor...@gmail.com> wrote:
>
>> Hi,
>>
>> I am working with the SVMWithSGD classification algorithm on Spark. It
>> works fine for me, however, I would like to recognize the instances that
>> are classified with a high confidence from those with a low one. How do we
>> define the threshold here? Ultimately, I want to keep only those for which
>> the algorithm is very *very* certain about its its decision! How to do
>> that? Is this feature supported already by any MLlib algorithm? What if I
>> had multiple categories?
>>
>> Any input is highly appreciated!
>>
>
>


-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Spark streaming twitter exception

2014-09-21 Thread Akhil Das
Can you try adding these dependencies?

libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10"
% "1.0.1"
libraryDependencies += "org.twitter4j" % "twitter4j-core" % "4.0.0"
libraryDependencies += "org.twitter4j" % "twitter4j" % "4.0.0"


And make sure these 3 jars are downloaded and present in the classpath.

Thanks
Best Regards

On Sun, Sep 21, 2014 at 3:46 AM, Maisnam Ns  wrote:

> HI ,
>
> Can somebody help me with adding library dependencies in  my build.sbt so
> that the java.lang.NoClassDefFoundError issue can be resolved.
>
> My sbt (only the dependencies part) ->
>
>  libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.1" ,
>   "org.apache.spark" %% "spark-streaming" % "1.0.1" ,
>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.1"
> )
>
> The exception , I am getting  ->
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/twitter/TwitterUtils$ at
> TwitterApp$.main(TwitterApp.scala:64)
>at TwitterApp.main(TwitterApp.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:330)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> The command to submit the jar -> bin/spark-submit --class "TwitterApp"
> --master local[4] twitterapp-project_2.10-1.0.jar
>
> Thanks in advance.
>
> Regards,
> NIranjan
>
>


Worker state is 'killed'

2014-09-21 Thread Sarath Chandra
Hi All,

I'm executing a simple job in spark which reads a file on HDFS, processes
the lines and saves the processed lines back to HDFS. All the 3 stages are
happening correctly and I'm able to see the processed file on the HDFS.

But on the spark UI, the worker state is shown as "killed". And I'm not
finding any exceptions being thrown in the logs.

What could be going wrong?

...
var newLines = lines.flatMap(line => process(line));
newLines.saveAsTextFile(hdfsPath);
...
def process(line: String): Array[String] = {
...
Array(str1, str2);
}
...

~Sarath.


Possibly a dumb question: differences between saveAsNewAPIHadoopFile and saveAsNewAPIHadoopDataset?

2014-09-21 Thread innowireless TaeYun Kim
Hi,

 

I'm confused with saveAsNewAPIHadoopFile and saveAsNewAPIHadoopDataset.

What's the difference between the two?

What's the individual use cases of the two APIs?

Could you describe the internal flows of the two APIs briefly?

 

I've used Spark several months, but I have no experience on MapReduce
programming.

(I've read a few book chapters on MapReduce, but actually not written code
myself.)

So maybe this confusion comes from my lack of experience on MapReduce
programming.

(I hoped it won't necessary to have since I could use Spark.)

 

Thanks.

 



Re: return probability \ confidence instead of actual class

2014-09-21 Thread Adamantios Corais
Nobody?

If that's not supported already, can please, at least, give me a few hints
on how to implement it?

Thanks!


On Fri, Sep 19, 2014 at 7:43 PM, Adamantios Corais <
adamantios.cor...@gmail.com> wrote:

> Hi,
>
> I am working with the SVMWithSGD classification algorithm on Spark. It
> works fine for me, however, I would like to recognize the instances that
> are classified with a high confidence from those with a low one. How do we
> define the threshold here? Ultimately, I want to keep only those for which
> the algorithm is very *very* certain about its its decision! How to do
> that? Is this feature supported already by any MLlib algorithm? What if I
> had multiple categories?
>
> Any input is highly appreciated!
>


Re: java.lang.ClassNotFoundException on driver class in executor

2014-09-21 Thread Andrew Or
Hi Barrington,

Have you tried running it from the command line? (i.e. bin/spark-submit
--master yarn-client --class YOUR_CLASS YOUR_JAR) Does it still fail? I am
not super familiar with running Spark through intellij, but the AFAIK the
classpaths are setup a little differently there. Also, Spark submit does
this for you nicely, so if you go through this path you don't even have to
call `setJars` as you did in your application.

-Andrew

2014-09-21 12:52 GMT-07:00 Barrington Henry :

> Hi,
>
> I am running spark from my IDE (InteliJ) using YARN as my cluster manager.
> However, the executor node is not able to find my main driver class
> “LascoScript”. I keep getting  java.lang.ClassNotFoundException.
> I tried adding  the jar of the main class by running the snippet below
>
>
>val conf = new SparkConf().set("spark.driver.host", "barrymac")
>   .setMaster("yarn-client")
>   .setAppName("Lasco Script”)
>
> .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
>
> But the jarOfClass function returns nothing. See below for logs.
>
> 
>
> 14/09/21 10:53:15 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> barrymac): java.lang.ClassNotFoundException: LascoScript$$anonfun$1
> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> java.security.AccessController.doPrivileged(Native Method)
> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> java.lang.ClassLoader.loadClass(ClassLoader.java:423)
> java.lang.ClassLoader.loadClass(ClassLoader.java:356)
> java.lang.Class.forName0(Native Method)
> java.lang.Class.forName(Class.java:264)
>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
>
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593)
>
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
>
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
>
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> java.lang.Thread.run(Thread.java:722)
> 14/09/21 10:53:15 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1)
> on executor barrymac: java.lang.ClassNotFoundException
> (LascoScript$$anonfun$1) [duplicate 1]
> 14/09/21 10:53:15 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID
> 4, barrymac, NODE_LOCAL, 1312 bytes)
> 14/09/21 10:53:15 INFO TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2)
> on executor barrymac: java.lang.ClassNotFoundException
> (LascoScript$$anonfun$1) [duplicate 2]
> 14/09/21 10:53:15 INFO TaskSetManager: Starting task 2.1 in stage 0.0 (TID
> 5, barrymac, NODE_LOCAL, 1312 bytes)
> 14/09/21 10:53:15 INFO TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3)
> on executor barrymac: java.lang.ClassNotFoundException
> (LascoScript$$anonfun$1) [duplicate 3]
> 14/09/21 10:53:15 INFO TaskSetManager: Starting task 3.1 in stage 0.0 (TID
> 6, barrymac, NODE_LOCAL, 1312 bytes)
> 14/09/21 10:53:15 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 4)
> on executor barrymac: java.lang.ClassNotFoundException
> (LascoScript$$anonfun$1) [duplicate 4]
> 14/09/21 10:53:15 INFO TaskSetManager: Starting task 1.2 in stage 0.0 (TID
> 7, barrymac, NODE_LOCAL, 1312 bytes)
> 14/09/21 10:53:15 INFO TaskSetManager: Lost task 2.1 in stage 0.0 (TID 5)
> on executor barrymac: java.lang.ClassNotFoundException
> (LascoScript$$anonfun$1) [duplicate 5]
> 14/09/21 10:53:15 INFO TaskSetManager: Starting task 2.2 in stage 0.0 (TID
> 8, barrymac, NODE_LOCAL, 1312 bytes)
> 14/09/21 10:53:15 INFO TaskSetManager: Lost task 3.1 in stage 0.0 

Re: Problem with pyspark command line invocation -- option truncation... (Spark v1.1.0) ...

2014-09-21 Thread Andrew Or
Hi Didata,

An alternative to what Sandy proposed is to set the Spark properties in a
special file `conf/spark-defaults.conf`. That way you don't have to specify
all the configs through the command line every time. The `--conf` option is
mostly intended to change one or two parameters, but it becomes cumbersome
to specify `--conf` many times, one for each config you have.

In general, when a Spark setting refers to "java options", it applies to
non-Spark property java options (e.g. -Xmx5g, or -XX:-UseParallelGC). The
recommended way of setting Spark-specific properties is documented here:
http://spark.apache.org/docs/latest/configuration.html).

Also, as an aside (you may already know this), but to pinpoint exactly what
went wrong with your executors in Yarn, you can visit the ResourceManager's
web UI and click into your application. If the Hadoop JobHistoryServer is
setup properly, it will redirect you to logs of the failed executors.

Let me know if you have more questions,
-Andrew



2014-09-21 10:38 GMT-07:00 Sandy Ryza :

> If using a client deploy mode, the driver memory can't go through --conf.
>  spark-submit handles --driver-memory as a special case because it needs to
> know how much memory to give the JVM before starting it and interpreting
> the other properties.
>
> -Sandy
>
> On Tue, Sep 16, 2014 at 10:20 PM, Dimension Data, LLC. <
> subscripti...@didata.us> wrote:
>
>>  Hi Sandy:
>>
>> Thank you. I have not tried that mechanism (I wasn't are of it). I will
>> try that instead.
>>
>> Is it possible to also represent '--driver-memory' and
>> '--executor-memory' (and basically all properties)
>> using the '--conf' directive?
>>
>> The Reason: I actually discovered the below issue while writing a custom
>> PYTHONSTARTUP script that I use
>> to launch *bpython* or *python* or my *WING python IDE* with. That
>> script reads a python *dict* (from a file)
>> containing key/value pairs from which it constructs the
>> "--driver-java-options ...", which I will now
>> switch generate '--conf key1=val1 --conf key2=val2 --conf key3=val3 (and
>> so on), instead.
>>
>> If all of the properties could be represented in this way, then it makes
>> the code cleaner (all in
>> the dict file, and no one-offs).
>>
>> Either way, thank you. =:)
>>
>> Noel,
>> team didata
>>
>>
>>  On 09/16/2014 08:03 PM, Sandy Ryza wrote:
>>
>> Hi team didata,
>>
>>  This doesn't directly answer your question, but with Spark 1.1, instead
>> of user the driver options, it's better to pass your spark properties using
>> the "conf" option.
>>
>>  E.g.
>> pyspark --master yarn-client --conf spark.shuffle.spill=true --conf
>> spark.yarn.executor.memoryOverhead=512M
>>
>>  Additionally, executor and memory have dedicated options:
>>
>>  pyspark --master yarn-client --conf spark.shuffle.spill=true --conf
>> spark.yarn.executor.memoryOverhead=512M --driver-memory 3G
>> --executor-memory 5G
>>
>>  -Sandy
>>
>>
>> On Tue, Sep 16, 2014 at 6:22 PM, Dimension Data, LLC. <
>> subscripti...@didata.us> wrote:
>>
>>>
>>>
>>> Hello friends:
>>>
>>> Yesterday I compiled Spark 1.1.0 against CDH5's Hadoop/YARN
>>> distribution. Everything went fine, and everything seems
>>> to work, but for the following.
>>>
>>> Following are two invocations of the 'pyspark' script, one with
>>> enclosing quotes around the options passed to
>>> '--driver-java-options', and one without them. I added the following
>>> one-line in the 'pyspark' script to
>>> show my problem...
>>>
>>> ADDED: echo "xxx${PYSPARK_SUBMIT_ARGS}xxx" # Added after the line that
>>> exports this variable.
>>>
>>> =
>>>
>>> FIRST:
>>> [ without enclosing quotes ]:
>>>
>>> user@linux$ pyspark --master yarn-client --driver-java-options
>>> -Dspark.executor.memory=1G -Dspark.ui.port=8468 -Dspark.driver.memory=512M
>>> -Dspark.yarn.executor.memoryOverhead=512M -Dspark.executor.instances=3
>>> -Dspark.yarn.jar=hdfs://namenode:8020/user/spark/share/lib/spark-assembly-1.1.0-hadoop2.3.0-cdh5.1.2.jar
>>> xxx --master yarn-client --driver-java-options
>>> -Dspark.executor.memory=1Gxxx  <--- echo statement show option truncation.
>>>
>>> While this succeeds in getting to a pyspark shell prompt (sc), the
>>> context isn't setup properly because, as seen
>>> in red above and below, all but the first option took effect. (Note
>>> spark.executor.memory is correct but that's only because
>>> my spark defaults coincide with it.)
>>>
>>> 14/09/16 17:35:32 INFO yarn.Client:   command: $JAVA_HOME/bin/java
>>> -server -Xmx512m -Djava.io.tmpdir=$PWD/tmp
>>> '-Dspark.tachyonStore.folderName=spark-e225c04d-5333-4ca6-9a78-1c3392438d89'
>>> '-Dspark.serializer.objectStreamReset=100' '-Dspark.executor.memory=1G'
>>> '-Dspark.rdd.compress=True' '-Dspark.yarn.secondary.jars='
>>> '-Dspark.submit.pyFiles='
>>> '-Dspark.serializer=org.apache.spark.serializer.KryoSerializer'
>>> '-Dspark.driver.host=dstorm' '-Dspark.driver.appUIHistoryAddress=' '-
>>> Dspark.app.name=Py

Re: Spark and disk usage.

2014-09-21 Thread Andrew Ash
Thanks for the info Burak!

I filed a bug on myself at https://issues.apache.org/jira/browse/SPARK-3631
to turn this information into a new section on the programming guide.
Thanks for the explanation it's very helpful.

Andrew

On Wed, Sep 17, 2014 at 12:08 PM, Burak Yavuz  wrote:

> Yes, writing to HDFS is more expensive, but I feel it is still a small
> price to pay when compared to having a Disk Space Full error three hours in
> and having to start from scratch.
>
> The main goal of checkpointing is to truncate the lineage. Clearing up
> shuffle writes come as a bonus to checkpointing, it is not the main goal.
> The
> subtlety here is that .checkpoint() is just like .cache(). Until you call
> an action, nothing happens. Therefore, if you're going to do 1000 maps in a
> row and you don't want to checkpoint in the meantime until a shuffle
> happens, you will still get a StackOverflowError, because the lineage is
> too long.
>
> I went through some of the code for checkpointing. As far as I can tell,
> it materializes the data in HDFS, and resets all its dependencies, so you
> start
> a fresh lineage. My understanding would be that checkpointing still should
> be done every N operations to reset the lineage. However, an action must be
> performed before the lineage grows too long.
>
> I believe it would be nice to write up checkpointing in the programming
> guide. The reason that it's not there yet I believe is that most
> applications don't
> grow such a long lineage, except in Spark Streaming, and some MLlib
> algorithms. If you can help with the guide, I think it would be a nice
> feature to have!
>
> Burak
>
>
> - Original Message -
> From: "Andrew Ash" 
> To: "Burak Yavuz" 
> Cc: "Макар Красноперов" , "user" <
> user@spark.apache.org>
> Sent: Wednesday, September 17, 2014 11:04:02 AM
> Subject: Re: Spark and disk usage.
>
> Thanks for the info!
>
> Are there performance impacts with writing to HDFS instead of local disk?
>  I'm assuming that's why ALS checkpoints every third iteration instead of
> every iteration.
>
> Also I can imagine that checkpointing should be done every N shuffles
> instead of every N operations (counting maps), since only the shuffle
> leaves data on disk.  Do you have any suggestions on this?
>
> We should write up some guidance on the use of checkpointing in the
> programming
> guide  - I
> can
> help with this
>
> Andrew
>
>


Re: Found both spark.driver.extraClassPath and SPARK_CLASSPATH

2014-09-21 Thread Koert Kuipers
i have found no way around this. basically this makes SPARK_CLASSPATH
unusable. and the alternative for enabling lzo on a cluster is not
reasonable.

one has to set in spark-defaults.conf:
spark.executor.extraClassPath
/usr/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar
spark.executor.extraLibraryPath
/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native

and then for every job submitted one has to do using spark-submit (i get
errors if i leave this out):
--driver-class-path /usr/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar
--driver-library-path
/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native

i think the settings in spark-defaults.conf are fine, but expecting the
user to set the driver-class-path and driver-library-path like that is
cumbersome and requires knowledge of locations of certain files on the
cluster. thats not workable to me.

On Mon, Sep 15, 2014 at 11:16 AM, Koert Kuipers  wrote:

> in spark 1.1.0 i get this error:
>
> 2014-09-14 23:17:01 ERROR actor.OneForOneStrategy: Found both
> spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former.
>
> i checked my application. i do not set spark.driver.extraClassPath or
> SPARK_CLASSPATH.
>
> SPARK_CLASSPATH is set in spark-env.sh since the machine is a worker and
> this is how lzo is added to classpath. as a user i cannot modify this.
>
> looking at the logs i see the value of SPARK_CLASSPATH ends up in
> spark.driver.extraClassPath, but thats not done by me. i presume some logic
> in spark-submit does this.
>
>


java.lang.ClassNotFoundException on driver class in executor

2014-09-21 Thread Barrington Henry
Hi,

I am running spark from my IDE (InteliJ) using YARN as my cluster manager. 
However, the executor node is not able to find my main driver class 
“LascoScript”. I keep getting  java.lang.ClassNotFoundException.
I tried adding  the jar of the main class by running the snippet below


   val conf = new SparkConf().set("spark.driver.host", "barrymac")
  .setMaster("yarn-client")
  .setAppName("Lasco Script”)
  
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)

But the jarOfClass function returns nothing. See below for logs.



14/09/21 10:53:15 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
barrymac): java.lang.ClassNotFoundException: LascoScript$$anonfun$1
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
java.lang.ClassLoader.loadClass(ClassLoader.java:423)
java.lang.ClassLoader.loadClass(ClassLoader.java:356)
java.lang.Class.forName0(Native Method)
java.lang.Class.forName(Class.java:264)

org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593)
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)
14/09/21 10:53:15 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on 
executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) 
[duplicate 1]
14/09/21 10:53:15 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 4, 
barrymac, NODE_LOCAL, 1312 bytes)
14/09/21 10:53:15 INFO TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2) on 
executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) 
[duplicate 2]
14/09/21 10:53:15 INFO TaskSetManager: Starting task 2.1 in stage 0.0 (TID 5, 
barrymac, NODE_LOCAL, 1312 bytes)
14/09/21 10:53:15 INFO TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3) on 
executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) 
[duplicate 3]
14/09/21 10:53:15 INFO TaskSetManager: Starting task 3.1 in stage 0.0 (TID 6, 
barrymac, NODE_LOCAL, 1312 bytes)
14/09/21 10:53:15 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 4) on 
executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) 
[duplicate 4]
14/09/21 10:53:15 INFO TaskSetManager: Starting task 1.2 in stage 0.0 (TID 7, 
barrymac, NODE_LOCAL, 1312 bytes)
14/09/21 10:53:15 INFO TaskSetManager: Lost task 2.1 in stage 0.0 (TID 5) on 
executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) 
[duplicate 5]
14/09/21 10:53:15 INFO TaskSetManager: Starting task 2.2 in stage 0.0 (TID 8, 
barrymac, NODE_LOCAL, 1312 bytes)
14/09/21 10:53:15 INFO TaskSetManager: Lost task 3.1 in stage 0.0 (TID 6) on 
executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) 
[duplicate 6]
14/09/21 10:53:15 INFO TaskSetManager: Starting task 3.2 in stage 0.0 (TID 9, 
barrymac, NODE_LOCAL, 1312 bytes)
14/09/21 10:53:15 INFO TaskSetManager: Lost task 1.2 in stage 0.0 (TID 7) on 
executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) 
[duplicate 7]
14/09/21 10:53:15 INFO TaskSetManager: Starting task 1.3 in stage 0.0 (TID 10, 
barrymac, NODE_LOCAL,

Re: Shuffle size difference - operations on RDD vs. operations on SchemaRDD

2014-09-21 Thread Michael Armbrust
Spark SQL always uses a custom configuration of Kryo under the hood to
improve shuffle performance:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala

Michael

On Sun, Sep 21, 2014 at 9:04 AM, Grega Kešpret  wrote:

> Hi,
>
> I am seeing different shuffle write sizes when using SchemaRDD (versus
> normal RDD). I'm doing the following:
>
> case class DomainObj(a: String, b: String, c: String, d: String)
>
> val logs: RDD[String] = sc.textFile(...)
> val filtered: RDD[String] = logs.filter(...)
> val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)
>
> 
> 1. Operations on RDD:
> 
> val results = requests
> .filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
> .mapPartitions(objs => objs.map(obj => (obj, 1)))
> .reduceByKey(_ + _, 200)
> .collect()
>
> 
> 2. Operations on SchemaRDD:
> 
> myDomainObjects.registerTempTable("myDomainObjects")
>
> val results = sqlContext.sql("""
> SELECT
> a, b, c, d, COUNT(*) total
> FROM
> myDomainObjects
> WHERE
> a IN ('SomeValue', 'SomeOtherValue')
> GROUP BY
> a, b, c, d
> """).collect()
>
> In the first case (RDD), the query returns in 2 minutes and 30 seconds
> with the input size 28.4GB, and shuffle write size 525.3MB and shuffle read
> size 472.5MB.
>
> In the second case (SchemaRDD), the query returns in 2 minutes and 9
> seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
> 233.0MB.
>
> Since in the second case, the shuffle size is half of the first case, I'd
> like to understand why.
>
> Thanks,
> Grega
>


Re: pyspark on yarn - lost executor

2014-09-21 Thread Sandy Ryza
Hi Oleg,

Those parameters control the number and size of Spark's daemons on the
cluster.  If you're interested in how these daemons relate to each other
and interact with YARN, I wrote a post on this a little while ago -
http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

In general, typing "spark-submit --help" will list the available options
and what they control.

To fetch the executor logs for an application, you can use "yarn logs
-applicationId ".

-Sandy

On Thu, Sep 18, 2014 at 5:47 AM, Oleg Ruchovets 
wrote:

> Great.
>   Upgrade helped.
>
> Still need some inputs:
> 1) Is there any log files of spark job execution?
> 2) Where can I read about tuning / parameter configuration:
>
> For example:
> --num-executors 12
> --driver-memory 4g
> --executor-memory 2g
>
> what is the meaning of thous parameters?
>
> Thanks
> Oleg.
>
> On Thu, Sep 18, 2014 at 12:15 AM, Davies Liu 
> wrote:
>
>> Maybe the Python worker use too much memory during groupByKey(),
>> groupByKey() with larger numPartitions can help.
>>
>> Also, can you upgrade your cluster to 1.1? It can spilling the data
>> into disks if the memory can not hold all the data during groupByKey().
>>
>> Also, If there is hot key with dozens of millions of values, the PR [1]
>> can help it, it actually helped someone with large datasets (3T).
>>
>> Davies
>>
>> [1] https://github.com/apache/spark/pull/1977
>>
>> On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets 
>> wrote:
>> >
>> > Sure, I'll post to the mail list.
>> >
>> > groupByKey(self, numPartitions=None)
>> >
>> > source code
>> >
>> > Group the values for each key in the RDD into a single sequence.
>> Hash-partitions the resulting RDD with into numPartitions partitions.
>> >
>> >
>> > So instead of using default I'll provide numPartitions , but what is
>> the best practice to calculate the number of partitions? and how number of
>> partitions related to my original problem?
>> >
>> >
>> > Thanks
>> >
>> > Oleg.
>> >
>> >
>> > http://spark.apache.org/docs/1.0.2/api/python/frames.html
>> >
>> >
>> >
>> > On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman <
>> eric.d.fried...@gmail.com> wrote:
>> >>
>> >> Look at the API for text file and groupByKey. Please don't take
>> threads off list. Other people have the same questions.
>> >>
>> >> 
>> >> Eric Friedman
>> >>
>> >> On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets 
>> wrote:
>> >>
>> >> Can hou please explain how to configure partitions?
>> >> Thanks
>> >> Oleg
>> >>
>> >> On Wednesday, September 17, 2014, Eric Friedman <
>> eric.d.fried...@gmail.com> wrote:
>> >>>
>> >>> Yeah, you need to increase partitions. You only have one on your text
>> file. On groupByKey you're getting the pyspark default, which is too low.
>> >>>
>> >>> 
>> >>> Eric Friedman
>> >>>
>> >>> On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets 
>> wrote:
>> >>>
>> >>> This is very good question :-).
>> >>>
>> >>> Here is my code:
>> >>>
>> >>> sc = SparkContext(appName="CAD")
>> >>> lines = sc.textFile(sys.argv[1], 1)
>> >>> result = lines.map(doSplit).groupByKey().mapValues(lambda vc:
>> my_custom_function(vc))
>> >>> result.saveAsTextFile(sys.argv[2])
>> >>>
>> >>> Should I configure partitioning manually ? Where should I configure
>> it? Where can I read about partitioning best practices?
>> >>>
>> >>> Thanks
>> >>> Oleg.
>> >>>
>> >>> On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman <
>> eric.d.fried...@gmail.com> wrote:
>> 
>>  How many partitions do you have in your input rdd?  Are you
>> specifying numPartitions in subsequent calls to groupByKey/reduceByKey?
>> 
>>  On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets 
>> wrote:
>> 
>>  Hi ,
>>    I am execution pyspark on yarn.
>>  I have successfully executed initial dataset but now I growed it 10
>> times more.
>> 
>>  during execution I got all the time this error:
>>    14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost
>> executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
>> 
>>   tasks are failed a resubmitted again:
>> 
>>  14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23,
>> 26, 29, 32, 33, 48, 75, 86, 91, 93, 94
>>  14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52,
>> 60, 93
>>  14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20,
>> 23, 27, 39, 51, 64
>>  14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68,
>> 80
>>  14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20,
>> 34, 42, 61, 67, 77, 81, 91
>> >>

How to initialize updateStateByKey operation

2014-09-21 Thread Soumitra Kumar
I started with StatefulNetworkWordCount to have a running count of words seen.

I have a file 'stored.count' which contains the word counts.

$ cat stored.count
a 1
b 2

I want to initialize StatefulNetworkWordCount with the values in 'stored.count' 
file, how do I do that?

I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be 
useful to have an initial RDD feeding into 'counts' at 't = 1', as below.

   initial
 |
t = 1: pageView -> ones -> counts
 |
t = 2: pageView -> ones -> counts
...

I have also attached the modified figure 2 of 
http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf .

I managed to hack Spark code to achieve this, and attaching the modified files.

Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey 
method, as
def updateStateByKey[S: ClassTag](
initial : RDD [(K, S)],
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
  ): DStream[(K, S)]

If it sounds interesting for larger crowd I would be happy to cleanup the code, 
and volunteer to push into the code. I don't know the procedure to that though.

-Soumitra.
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.{Time, Duration}

/**
 * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
 * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
 * these functions.
 */
class PairDStreamFunctions[K, V](self: DStream[(K,V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
  extends Serializable
{
  private[streaming] def ssc = self.ssc

  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
  }

  /**
   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
   * generate the RDDs with Spark's default number of partitions.
   */
  def groupByKey(): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner())
  }

  /**
   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
   * generate the RDDs with `numPartitions` partitions.
   */
  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(numPartitions))
  }

  /**
   * Return a new DStream by applying `groupByKey` on each RDD. The supplied
   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
   */
  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
  .asInstanceOf[DStream[(K, Iterable[V])]]
  }

  /**
   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
   * with Spark's default number of partitions.
   */
  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
  }

  /**
   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
   * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
   * with `numPartitions` partitions.
   */
  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPart

Re: Problem with pyspark command line invocation -- option truncation... (Spark v1.1.0) ...

2014-09-21 Thread Sandy Ryza
If using a client deploy mode, the driver memory can't go through --conf.
 spark-submit handles --driver-memory as a special case because it needs to
know how much memory to give the JVM before starting it and interpreting
the other properties.

-Sandy

On Tue, Sep 16, 2014 at 10:20 PM, Dimension Data, LLC. <
subscripti...@didata.us> wrote:

>  Hi Sandy:
>
> Thank you. I have not tried that mechanism (I wasn't are of it). I will
> try that instead.
>
> Is it possible to also represent '--driver-memory' and '--executor-memory'
> (and basically all properties)
> using the '--conf' directive?
>
> The Reason: I actually discovered the below issue while writing a custom
> PYTHONSTARTUP script that I use
> to launch *bpython* or *python* or my *WING python IDE* with. That script
> reads a python *dict* (from a file)
> containing key/value pairs from which it constructs the
> "--driver-java-options ...", which I will now
> switch generate '--conf key1=val1 --conf key2=val2 --conf key3=val3 (and
> so on), instead.
>
> If all of the properties could be represented in this way, then it makes
> the code cleaner (all in
> the dict file, and no one-offs).
>
> Either way, thank you. =:)
>
> Noel,
> team didata
>
>
>  On 09/16/2014 08:03 PM, Sandy Ryza wrote:
>
> Hi team didata,
>
>  This doesn't directly answer your question, but with Spark 1.1, instead
> of user the driver options, it's better to pass your spark properties using
> the "conf" option.
>
>  E.g.
> pyspark --master yarn-client --conf spark.shuffle.spill=true --conf
> spark.yarn.executor.memoryOverhead=512M
>
>  Additionally, executor and memory have dedicated options:
>
>  pyspark --master yarn-client --conf spark.shuffle.spill=true --conf
> spark.yarn.executor.memoryOverhead=512M --driver-memory 3G
> --executor-memory 5G
>
>  -Sandy
>
>
> On Tue, Sep 16, 2014 at 6:22 PM, Dimension Data, LLC. <
> subscripti...@didata.us> wrote:
>
>>
>>
>> Hello friends:
>>
>> Yesterday I compiled Spark 1.1.0 against CDH5's Hadoop/YARN distribution.
>> Everything went fine, and everything seems
>> to work, but for the following.
>>
>> Following are two invocations of the 'pyspark' script, one with enclosing
>> quotes around the options passed to
>> '--driver-java-options', and one without them. I added the following
>> one-line in the 'pyspark' script to
>> show my problem...
>>
>> ADDED: echo "xxx${PYSPARK_SUBMIT_ARGS}xxx" # Added after the line that
>> exports this variable.
>>
>> =
>>
>> FIRST:
>> [ without enclosing quotes ]:
>>
>> user@linux$ pyspark --master yarn-client --driver-java-options
>> -Dspark.executor.memory=1G -Dspark.ui.port=8468 -Dspark.driver.memory=512M
>> -Dspark.yarn.executor.memoryOverhead=512M -Dspark.executor.instances=3
>> -Dspark.yarn.jar=hdfs://namenode:8020/user/spark/share/lib/spark-assembly-1.1.0-hadoop2.3.0-cdh5.1.2.jar
>> xxx --master yarn-client --driver-java-options
>> -Dspark.executor.memory=1Gxxx  <--- echo statement show option truncation.
>>
>> While this succeeds in getting to a pyspark shell prompt (sc), the
>> context isn't setup properly because, as seen
>> in red above and below, all but the first option took effect. (Note
>> spark.executor.memory is correct but that's only because
>> my spark defaults coincide with it.)
>>
>> 14/09/16 17:35:32 INFO yarn.Client:   command: $JAVA_HOME/bin/java
>> -server -Xmx512m -Djava.io.tmpdir=$PWD/tmp
>> '-Dspark.tachyonStore.folderName=spark-e225c04d-5333-4ca6-9a78-1c3392438d89'
>> '-Dspark.serializer.objectStreamReset=100' '-Dspark.executor.memory=1G'
>> '-Dspark.rdd.compress=True' '-Dspark.yarn.secondary.jars='
>> '-Dspark.submit.pyFiles='
>> '-Dspark.serializer=org.apache.spark.serializer.KryoSerializer'
>> '-Dspark.driver.host=dstorm' '-Dspark.driver.appUIHistoryAddress=' '-
>> Dspark.app.name=PySparkShell' '-Dspark.driver.appUIAddress=dstorm:4040'
>> '-Dspark.driver.extraJavaOptions=-Dspark.executor.memory=1G'
>> '-Dspark.fileserver.uri=http://192.168.0.16:60305'
>> '-Dspark.driver.port=44616' '-Dspark.master=yarn-client'
>> org.apache.spark.deploy.yarn.ExecutorLauncher --class 'notused' --jar
>> null  --arg  'dstorm:44616' --executor-memory 1024 --executor-cores 1 
>> --num-executors
>> 2 1> /stdout 2> /stderr
>>
>> (Note: I happen to notice that 'spark.driver.memory' is missing as well).
>>
>> ===
>>
>> NEXT:
>>
>> [ So let's try with enclosing quotes ]
>> user@linux$ pyspark --master yarn-client --driver-java-options
>> '-Dspark.executor.memory=1G -Dspark.ui.port=8468 -Dspark.driver.memory=512M
>> -Dspark.yarn.executor.memoryOverhead=512M -Dspark.executor.instances=3
>> -Dspark.yarn.jar=hdfs://namenode:8020/user/spark/share/lib/spark-assembly-1.1.0-hadoop2.3.0-cdh5.1.2.jar'
>> xxx --master yarn-client --driver-java-options
>> "-Dspark.executor.memory=1G -Dspark.ui.port=8468 -Dspark.driver.memory=512M
>> -Dspark.yarn.executor.memoryOverhead=512M -Dspark.executor.instances=3
>> -Dspar

Can SparkContext shared across nodes/drivers

2014-09-21 Thread 林武康
Hi all,
So far as I known, a SparkContext instance take in charge of some resources of 
a cluster the master assigned to.  And It is hardly shared with different 
sparkcontexts. meanwhile, schedule between applications is also not easier.
To address this without introducing extra resource schedule system such as 
yarn/mesos, I suppose to create a special SparkContext that can be shared 
across nodes/drivers, that is,  submitting jobs from different nodes, but share 
same rdd definition and task-scheduler. 
Is this idea valuable? Is this possible to implemented? or it is value of 
nothing?
 
Thanks for any advices.
 
lin wukang

Re: Distributed dictionary building

2014-09-21 Thread Debasish Das
zipWithUniqueId is also affected...

I had to persist the dictionaries to make use of the indices lower down in
the flow...

On Sun, Sep 21, 2014 at 1:15 AM, Sean Owen  wrote:

> Reference - https://issues.apache.org/jira/browse/SPARK-3098
> I imagine zipWithUniqueID is also affected, but may not happen to have
> exhibited in your test.
>
> On Sun, Sep 21, 2014 at 2:13 AM, Debasish Das 
> wrote:
> > Some more debug revealed that as Sean said I have to keep the
> dictionaries
> > persisted till I am done with the RDD manipulation.
> >
> > Thanks Sean for the pointer...would it be possible to point me to the
> JIRA
> > as well ?
> >
> > Are there plans to make it more transparent for the users ?
> >
> > Is it possible for the DAG to speculate such things...similar to branch
> > prediction ideas from comp arch...
> >
> >
> >
> > On Sat, Sep 20, 2014 at 1:56 PM, Debasish Das 
> > wrote:
> >>
> >> I changed zipWithIndex to zipWithUniqueId and that seems to be
> working...
> >>
> >> What's the difference between zipWithIndex vs zipWithUniqueId ?
> >>
> >> For zipWithIndex we don't need to run the count to compute the offset
> >> which is needed for zipWithUniqueId and so zipWithIndex is efficient ?
> It's
> >> not very clear from docs...
> >>
> >>
> >> On Sat, Sep 20, 2014 at 1:48 PM, Debasish Das  >
> >> wrote:
> >>>
> >>> I did not persist / cache it as I assumed zipWithIndex will preserve
> >>> order...
> >>>
> >>> There is also zipWithUniqueId...I am trying that...If that also shows
> the
> >>> same issue, we should make it clear in the docs...
> >>>
> >>> On Sat, Sep 20, 2014 at 1:44 PM, Sean Owen  wrote:
> 
>  From offline question - zipWithIndex is being used to assign IDs.
> From a
>  recent JIRA discussion I understand this is not deterministic within a
>  partition so the index can be different when the RDD is reevaluated.
> If you
>  need it fixed, persist the zipped RDD on disk or in memory.
> 
>  On Sep 20, 2014 8:10 PM, "Debasish Das" 
>  wrote:
> >
> > Hi,
> >
> > I am building a dictionary of RDD[(String, Long)] and after the
> > dictionary is built and cached, I find key "almonds" at value 5187
> using:
> >
> > rdd.filter{case(product, index) => product == "almonds"}.collect
> >
> > Output:
> >
> > Debug product almonds index 5187
> >
> > Now I take the same dictionary and write it out as:
> >
> > dictionary.map{case(product, index) => product + "," + index}
> > .saveAsTextFile(outputPath)
> >
> > Inside the map I also print what's the product at index 5187 and I
> get
> > a different product:
> >
> > Debug Index 5187 userOrProduct cardigans
> >
> > Is this an expected behavior from map ?
> >
> > By the way "almonds" and "apparel-cardigans" are just one off in the
> > index...
> >
> > I am using spark-1.1 but it's a snapshot..
> >
> > Thanks.
> > Deb
> >
> >
> >>>
> >>
> >
>


Shuffle size difference - operations on RDD vs. operations on SchemaRDD

2014-09-21 Thread Grega Kešpret
Hi,

I am seeing different shuffle write sizes when using SchemaRDD (versus
normal RDD). I'm doing the following:

case class DomainObj(a: String, b: String, c: String, d: String)

val logs: RDD[String] = sc.textFile(...)
val filtered: RDD[String] = logs.filter(...)
val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)


1. Operations on RDD:

val results = requests
.filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
.mapPartitions(objs => objs.map(obj => (obj, 1)))
.reduceByKey(_ + _, 200)
.collect()


2. Operations on SchemaRDD:

myDomainObjects.registerTempTable("myDomainObjects")

val results = sqlContext.sql("""
SELECT
a, b, c, d, COUNT(*) total
FROM
myDomainObjects
WHERE
a IN ('SomeValue', 'SomeOtherValue')
GROUP BY
a, b, c, d
""").collect()

In the first case (RDD), the query returns in 2 minutes and 30 seconds with
the input size 28.4GB, and shuffle write size 525.3MB and shuffle read size
472.5MB.

In the second case (SchemaRDD), the query returns in 2 minutes and 9
seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
233.0MB.

Since in the second case, the shuffle size is half of the first case, I'd
like to understand why.

Thanks,
Grega


Re: Setting up Spark 1.1 on Windows 7

2014-09-21 Thread Khaja Mohideen
Setting java_opts helped me fix the problem.

Thanks,
-Khaja

On Sun, Sep 21, 2014 at 9:25 AM, Khaja Mohideen  wrote:

> I was able to move past this error by deleting the .ivy2/cache folder.
>
> However, I am running into an out of memory error
> [error] java.util.concurrent.ExecutionException:
> java.lang.OutOfMemoryError: Jav
> a heap space
> [error] Use 'last' for the full log.
>
> This is despite the fact that I have set m2_opts like this:
> -Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
>
> Am I missng something?
>
> thanks,
> -Khaja
>
> On Sun, Sep 21, 2014 at 7:39 AM, Khaja M  wrote:
>
>> Hi:
>>
>> I am trying to setup Spark 1.1 on a Windows 7 box and I am running the sbt
>> assembly command and this is the error that I am seeing.
>>
>> [error] (streaming-flume-sink/*:update) sbt.ResolveException: unresolved
>> depende
>> ncy: commons-lang#commons-lang;2.6: configuration not found in
>> commons-lang#comm
>> ons-lang;2.6: 'compile'. It was required from
>> org.apache.avro#avro-compiler;1.7.
>> 3 compile
>> [error] (core/*:update) sbt.ResolveException: unresolved dependency:
>> org.slf4j#s
>> lf4j-api;1.7.5: configuration not found in org.slf4j#slf4j-api;1.7.5:
>> 'master'.
>> It was required from org.apache.spark#spark-core_2.10;1.1.0 compile
>> [error] Total time: 10 s, completed Sep 21, 2014 7:13:16 AM
>>
>> I looked at my local maven repo and found that I do have both the
>> dependencies there:
>> 1. commons-lang 2.6
>> 2. org.slf4j-api 1.7.5
>>
>> Any ideas on what I am missing?
>> Thanks,
>> -Khaja
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Spark-1-1-on-Windows-7-tp14759.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Avoid broacasting huge variables

2014-09-21 Thread octavian.ganea
Using mapPartitions and passing the big index object as a parameter to it was
not the best option, given the size of the big object and my RAM. The
workers died before starting the actual  computation. 

Anyway, creating a singleton object worked for me:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-td11048.html#a11315

The big index was loaded once per each node.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Avoid-broacasting-huge-variables-tp14696p14766.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting up Spark 1.1 on Windows 7

2014-09-21 Thread Khaja Mohideen
I was able to move past this error by deleting the .ivy2/cache folder.

However, I am running into an out of memory error
[error] java.util.concurrent.ExecutionException:
java.lang.OutOfMemoryError: Jav
a heap space
[error] Use 'last' for the full log.

This is despite the fact that I have set m2_opts like this:
-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m

Am I missng something?

thanks,
-Khaja

On Sun, Sep 21, 2014 at 7:39 AM, Khaja M  wrote:

> Hi:
>
> I am trying to setup Spark 1.1 on a Windows 7 box and I am running the sbt
> assembly command and this is the error that I am seeing.
>
> [error] (streaming-flume-sink/*:update) sbt.ResolveException: unresolved
> depende
> ncy: commons-lang#commons-lang;2.6: configuration not found in
> commons-lang#comm
> ons-lang;2.6: 'compile'. It was required from
> org.apache.avro#avro-compiler;1.7.
> 3 compile
> [error] (core/*:update) sbt.ResolveException: unresolved dependency:
> org.slf4j#s
> lf4j-api;1.7.5: configuration not found in org.slf4j#slf4j-api;1.7.5:
> 'master'.
> It was required from org.apache.spark#spark-core_2.10;1.1.0 compile
> [error] Total time: 10 s, completed Sep 21, 2014 7:13:16 AM
>
> I looked at my local maven repo and found that I do have both the
> dependencies there:
> 1. commons-lang 2.6
> 2. org.slf4j-api 1.7.5
>
> Any ideas on what I am missing?
> Thanks,
> -Khaja
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Spark-1-1-on-Windows-7-tp14759.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Issues with partitionBy: FetchFailed

2014-09-21 Thread David Rowe
Hi,

I've seen this problem before, and I'm not convinced it's GC.

When spark shuffles it writes a lot of small files to store the data to be
sent to other executors (AFAICT). According to what I've read around the
place the intention is that these files be stored in disk buffers, and
since sync() is never called, they exist only in memory. The problem is
when you have a lot of shuffle data, and the executors are configured to
use, say 90% of your memory, one of those is going to be written to disk -
either the JVM will be swapped out, or the files will be written out of
cache.

So, when these nodes are timing out, it's not a GC problem, it's that the
machine is actually thrashing.

I've had some success with this problem by reducing the amount of memory
that the executors are configured to use from say 90% to 60%. I don't know
the internals of the code, but I'm sure this number is related to the
fraction of your data that's going to be shuffled to other nodes. In any
case, it's not something that I can estimate in my own jobs very well - I
usually have to find the right number by trial and error.

Perhaps somebody who knows the internals a bit better can shed some light.

Cheers

Dave

On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai  wrote:

>  Hi,
>
>
>
> I’ve also met this problem before, I think you can try to set
> “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
> timeout, default is 60 seconds.
>
>
>
> Sometimes because of GC pause or some other reasons, acknowledged message
> will be timeout, which will lead to this exception, you can try setting a
> large value of this configuration.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Julien Carme [mailto:julien.ca...@gmail.com]
> *Sent:* Sunday, September 21, 2014 7:43 PM
> *To:* user@spark.apache.org
> *Subject:* Issues with partitionBy: FetchFailed
>
>
>
> Hello,
>
> I am facing an issue with partitionBy, it is not clear whether it is a
> problem with my code or with my spark setup. I am using Spark 1.1,
> standalone, and my other spark projects work fine.
>
> So I have to repartition a relatively large file (about 70 million lines).
> Here is a minimal version of what is not working fine:
>
> myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
>
> myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
>
> myRepartitionedRDD.saveAsTextFile(...)
>
> It runs quite some time, until I get some errors and it retries. Errors
> are:
>
> FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
> shuffleId=1,mapId=1,reduceId=5)
>
> Logs are not much more infomrative. I get:
>
> Java.io.IOException : sendMessageReliability failed because ack was not
> received within 60 sec
>
>
>
> I get similar errors with all my workers.
>
> Do you have some kind of explanation for this behaviour? What could be
> wrong?
>
> Thanks,
>
>
>
>
>


Re: Saving RDD with array of strings

2014-09-21 Thread Julien Carme
Just use flatMap, it does exactly what you need:

newLines.flatMap { lines => lines }.saveAsTextFile(...)


2014-09-21 11:26 GMT+02:00 Sarath Chandra <
sarathchandra.jos...@algofusiontech.com>:

> Hi All,
>
> If my RDD is having array/sequence of strings, how can I save them as a
> HDFS file with each string on separate line?
>
> For example if I write code as below, the output should get saved as hdfs
> file having one string per line
> ...
> ...
> var newLines = lines.map(line => myfunc(line));
> newLines.saveAsTextFile(hdfsPath);
> ...
> ...
> def myfunc(line: String):Array[String] = {
>   line.split(";");
> }
>
> Thanks,
> ~Sarath.
>


RE: Issues with partitionBy: FetchFailed

2014-09-21 Thread Shao, Saisai
Hi,

I’ve also met this problem before, I think you can try to set 
“spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, 
default is 60 seconds.

Sometimes because of GC pause or some other reasons, acknowledged message will 
be timeout, which will lead to this exception, you can try setting a large 
value of this configuration.

Thanks
Jerry

From: Julien Carme [mailto:julien.ca...@gmail.com]
Sent: Sunday, September 21, 2014 7:43 PM
To: user@spark.apache.org
Subject: Issues with partitionBy: FetchFailed

Hello,
I am facing an issue with partitionBy, it is not clear whether it is a problem 
with my code or with my spark setup. I am using Spark 1.1, standalone, and my 
other spark projects work fine.
So I have to repartition a relatively large file (about 70 million lines). Here 
is a minimal version of what is not working fine:
myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)
It runs quite some time, until I get some errors and it retries. Errors are:
FetchFailed(BlockManagerId(3,myWorker2, 52082,0), 
shuffleId=1,mapId=1,reduceId=5)
Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not 
received within 60 sec

I get similar errors with all my workers.
Do you have some kind of explanation for this behaviour? What could be wrong?
Thanks,




Issues with partitionBy: FetchFailed

2014-09-21 Thread Julien Carme
Hello,

I am facing an issue with partitionBy, it is not clear whether it is a
problem with my code or with my spark setup. I am using Spark 1.1,
standalone, and my other spark projects work fine.

So I have to repartition a relatively large file (about 70 million lines).
Here is a minimal version of what is not working fine:

myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)

It runs quite some time, until I get some errors and it retries. Errors are:

FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
shuffleId=1,mapId=1,reduceId=5)

Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not
received within 60 sec

I get similar errors with all my workers.

Do you have some kind of explanation for this behaviour? What could be
wrong?

Thanks,


Setting up Spark 1.1 on Windows 7

2014-09-21 Thread Khaja M
Hi:

I am trying to setup Spark 1.1 on a Windows 7 box and I am running the sbt
assembly command and this is the error that I am seeing. 

[error] (streaming-flume-sink/*:update) sbt.ResolveException: unresolved
depende
ncy: commons-lang#commons-lang;2.6: configuration not found in
commons-lang#comm
ons-lang;2.6: 'compile'. It was required from
org.apache.avro#avro-compiler;1.7.
3 compile
[error] (core/*:update) sbt.ResolveException: unresolved dependency:
org.slf4j#s
lf4j-api;1.7.5: configuration not found in org.slf4j#slf4j-api;1.7.5:
'master'.
It was required from org.apache.spark#spark-core_2.10;1.1.0 compile
[error] Total time: 10 s, completed Sep 21, 2014 7:13:16 AM

I looked at my local maven repo and found that I do have both the
dependencies there:
1. commons-lang 2.6
2. org.slf4j-api 1.7.5

Any ideas on what I am missing?
Thanks,
-Khaja



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Spark-1-1-on-Windows-7-tp14759.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Saving RDD with array of strings

2014-09-21 Thread Sarath Chandra
Hi All,

If my RDD is having array/sequence of strings, how can I save them as a
HDFS file with each string on separate line?

For example if I write code as below, the output should get saved as hdfs
file having one string per line
...
...
var newLines = lines.map(line => myfunc(line));
newLines.saveAsTextFile(hdfsPath);
...
...
def myfunc(line: String):Array[String] = {
  line.split(";");
}

Thanks,
~Sarath.


Re: Distributed dictionary building

2014-09-21 Thread Sean Owen
Reference - https://issues.apache.org/jira/browse/SPARK-3098
I imagine zipWithUniqueID is also affected, but may not happen to have
exhibited in your test.

On Sun, Sep 21, 2014 at 2:13 AM, Debasish Das  wrote:
> Some more debug revealed that as Sean said I have to keep the dictionaries
> persisted till I am done with the RDD manipulation.
>
> Thanks Sean for the pointer...would it be possible to point me to the JIRA
> as well ?
>
> Are there plans to make it more transparent for the users ?
>
> Is it possible for the DAG to speculate such things...similar to branch
> prediction ideas from comp arch...
>
>
>
> On Sat, Sep 20, 2014 at 1:56 PM, Debasish Das 
> wrote:
>>
>> I changed zipWithIndex to zipWithUniqueId and that seems to be working...
>>
>> What's the difference between zipWithIndex vs zipWithUniqueId ?
>>
>> For zipWithIndex we don't need to run the count to compute the offset
>> which is needed for zipWithUniqueId and so zipWithIndex is efficient ? It's
>> not very clear from docs...
>>
>>
>> On Sat, Sep 20, 2014 at 1:48 PM, Debasish Das 
>> wrote:
>>>
>>> I did not persist / cache it as I assumed zipWithIndex will preserve
>>> order...
>>>
>>> There is also zipWithUniqueId...I am trying that...If that also shows the
>>> same issue, we should make it clear in the docs...
>>>
>>> On Sat, Sep 20, 2014 at 1:44 PM, Sean Owen  wrote:

 From offline question - zipWithIndex is being used to assign IDs. From a
 recent JIRA discussion I understand this is not deterministic within a
 partition so the index can be different when the RDD is reevaluated. If you
 need it fixed, persist the zipped RDD on disk or in memory.

 On Sep 20, 2014 8:10 PM, "Debasish Das" 
 wrote:
>
> Hi,
>
> I am building a dictionary of RDD[(String, Long)] and after the
> dictionary is built and cached, I find key "almonds" at value 5187 using:
>
> rdd.filter{case(product, index) => product == "almonds"}.collect
>
> Output:
>
> Debug product almonds index 5187
>
> Now I take the same dictionary and write it out as:
>
> dictionary.map{case(product, index) => product + "," + index}
> .saveAsTextFile(outputPath)
>
> Inside the map I also print what's the product at index 5187 and I get
> a different product:
>
> Debug Index 5187 userOrProduct cardigans
>
> Is this an expected behavior from map ?
>
> By the way "almonds" and "apparel-cardigans" are just one off in the
> index...
>
> I am using spark-1.1 but it's a snapshot..
>
> Thanks.
> Deb
>
>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org