Block

2014-03-11 Thread David Thomas
What is the concept of Block and BlockManager in Spark? How is a Block
related to a Partition of a RDD?


pyspark broadcast error

2014-03-11 Thread Brad Miller
Hi All,

When I run the program shown below, I receive the error shown below.
I am running the current version of branch-0.9 from github.  Note that
I do not receive the error when I replace 2 ** 29 with 2 ** X,
where X  29.  More interestingly, I do not receive the error when X =
30, and when X  30 the code either crashes with Memory Error or
Py4JNetworkError: An error occurred while trying to connect to the
Java server.

I am aware that there are some bugs
(https://spark-project.atlassian.net/browse/SPARK-1065) related to
memory consumption with pyspark and broadcasting, but the behavior
with X = 29 seemed different and I was wondering if anybody had any
insight.

-Brad

*Program*
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '25g')
sc = SparkContext('spark://crosby.research.intel-research.net:7077',
'FeatureExtraction')
meg_512 = range((2 ** 29) / 8)
tmp_broad = sc.broadcast(meg_512)

*Error*
---
Py4JError Traceback (most recent call last)
ipython-input-1-db8033dee301 in module()
  3 sc = SparkContext('spark://crosby.research.intel-research.net:7077',
'FeatureExtraction')
  4 meg_1024 = range((2 ** 29) / 8)
 5 tmp_broad = sc.broadcast(meg_1024)

/home/spark/spark-branch-0.9/python/pyspark/context.py in broadcast(self, value)
277 pickleSer = PickleSerializer()
278 pickled = pickleSer.dumps(value)
-- 279 jbroadcast = self._jsc.broadcast(bytearray(pickled))
280 return Broadcast(jbroadcast.id(), value, jbroadcast,
281  self._pickled_broadcast_vars)

/home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
-- 537 self.target_id, self.name)
538
539 for temp_arg in temp_args:

/home/spark/spark-branch-0.9/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
302 raise Py4JError(
303 'An error occurred while calling
{0}{1}{2}. Trace:\n{3}\n'.
-- 304 format(target_id, '.', name, value))
305 else:
306 raise Py4JError(

Py4JError: An error occurred while calling o7.broadcast. Trace:
java.lang.NegativeArraySizeException
at py4j.Base64.decode(Base64.java:292)
at py4j.Protocol.getBytes(Protocol.java:167)
at py4j.Protocol.getObject(Protocol.java:276)
at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
at py4j.commands.CallCommand.execute(CallCommand.java:77)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:701)


building spark over proxy

2014-03-11 Thread hades dark
Can someone help me on how to build spark over proxy settings ..

-- 
REGARDS
ASHUTOSH JAIN
IIT-BHU VARANASI


Re: building spark over proxy

2014-03-11 Thread Bharath Vissapragada
http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3ccaaqhkj48japuzqc476es67c+rrfime87uprambdoofhcl0k...@mail.gmail.com%3E


On Tue, Mar 11, 2014 at 11:44 AM, hades dark hades.o...@gmail.com wrote:

 Can someone help me on how to build spark over proxy settings ..

 --
 REGARDS
 ASHUTOSH JAIN
 IIT-BHU VARANASI




-- 
Bharath Vissapragada
http://www.cloudera.com


Reading sequencefile

2014-03-11 Thread Jaonary Rabarisoa
Hi all,

I'm trying to read a sequenceFile that represent a set of jpeg image
generated using this tool :
http://stuartsierra.com/2008/04/24/a-million-little-files . According to
the documentation : Each key is the name of a file (a Hadoop “Text”), the
value is the binary contents of the file (a BytesWritable)

How do I load the generated file inside spark ?

Cheers,

Jaonary


Spark stand alone cluster mode

2014-03-11 Thread Gino Mathews
Hi,

I am new to spark.
I would like to run jobs  in Spark stand alone cluster mode.

No cluser managers other than spark is used. 
(https://spark.apache.org/docs/0.9.0/spark-standalone.html)
I have tried wordcount from spark shell and stand alone scala app.

The code reads input from HDFS and writes the results to HDFS. uses 2 worker 
nodes.

In spark-shell the wordcount is successful, how ever my effort to run stand 
alone programmes are in vain.

My environement
Ubuntu 12.04  - 32 bit
JAVA 1.7.0_51
I have installed spark @ $HOME/Downloads/spark-0.9.0-incubating
installed hadoop 2.2.0 as separate hduser and given permission to other users.
installed scala 2.10.3
installed sbt 0.13.1
Spark master act as HDFS master
I have one master and 2 worker nodes and HDFS is accessible in all nodes.
I downloaded example project and modified to use my spark cluster.
I started the sparkcluster at spark://192.168.0.138:7077
and hdfs://master:9000/
When I run the project as SPARK_HADOOP_VERSION=2.2.0 sbt run, I get following 
error

gino@master:~/Test/spark-example-project$ SPARK_HADOOP_VERSION=2.2.0 sbt run
[info] Loading project definition from 
/home/gino/Test/spark-example-project/project
[info] Set current project to spark-example-project (in build 
file:/home/gino/Test/spark-example-project/)
[info] Running com.Thinkpalm.spark.WordCountHDFS
[error] (run-main-0) java.lang.NoClassDefFoundError: 
org/apache/spark/SparkContext
java.lang.NoClassDefFoundError: org/apache/spark/SparkContext
at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12)
at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkContext
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12)
at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
[trace] Stack trace suppressed: run last compile:run for the full output.
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 0 s, completed Mar 11, 2014 2:54:54 PM

Could anyone give some pointers ... I have attached the project for reference.

Thanks and regards
Gino Mathews


spark-example-project.tgz
Description: spark-example-project.tgz


Re: Spark stand alone cluster mode

2014-03-11 Thread Yana Kadiyska
does sbt show full-classpath show spark-core on the classpath? I am
still pretty new to scala but it seems like you have val sparkCore
= org.apache.spark   %% spark-core% V.spark %
provided -- I believe the provided part means it's in your
classpath. Spark-shell script sets up a lot of stuff for you so...

On Tue, Mar 11, 2014 at 9:02 AM, Gino Mathews gin...@thinkpalm.com wrote:
 Hi,

 I am new to spark.
 I would like to run jobs  in Spark stand alone cluster mode.

 No cluser managers other than spark is used.
 (https://spark.apache.org/docs/0.9.0/spark-standalone.html)
 I have tried wordcount from spark shell and stand alone scala app.

 The code reads input from HDFS and writes the results to HDFS. uses 2 worker
 nodes.

 In spark-shell the wordcount is successful, how ever my effort to run stand
 alone programmes are in vain.



 My environement

 Ubuntu 12.04  - 32 bit

 JAVA 1.7.0_51

 I have installed spark @ $HOME/Downloads/spark-0.9.0-incubating
 installed hadoop 2.2.0 as separate hduser and given permission to other
 users.
 installed scala 2.10.3
 installed sbt 0.13.1

 Spark master act as HDFS master
 I have one master and 2 worker nodes and HDFS is accessible in all nodes.
 I downloaded example project and modified to use my spark cluster.
 I started the sparkcluster at spark://192.168.0.138:7077
 and hdfs://master:9000/
 When I run the project as SPARK_HADOOP_VERSION=2.2.0 sbt run, I get
 following error

 gino@master:~/Test/spark-example-project$ SPARK_HADOOP_VERSION=2.2.0 sbt run
 [info] Loading project definition from
 /home/gino/Test/spark-example-project/project
 [info] Set current project to spark-example-project (in build
 file:/home/gino/Test/spark-example-project/)
 [info] Running com.Thinkpalm.spark.WordCountHDFS
 [error] (run-main-0) java.lang.NoClassDefFoundError:
 org/apache/spark/SparkContext
 java.lang.NoClassDefFoundError: org/apache/spark/SparkContext
 at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12)
 at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkContext
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12)
 at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 [trace] Stack trace suppressed: run last compile:run for the full output.
 java.lang.RuntimeException: Nonzero exit code: 1
 at scala.sys.package$.error(package.scala:27)
 [trace] Stack trace suppressed: run last compile:run for the full output.
 [error] (compile:run) Nonzero exit code: 1
 [error] Total time: 0 s, completed Mar 11, 2014 2:54:54 PM

 Could anyone give some pointers ... I have attached the project for
 reference.



 Thanks and regards

 Gino Mathews


Pyspark Memory Woes

2014-03-11 Thread Aaron Olson
Dear Sparkians,

We are working on a system to do relational modeling on top of Spark, all
done in pyspark. While we've been learning a lot about Spark internals so
far, we're currently running into memory issues and wondering how best to
profile to fix them. Here are our symptoms:

   - We're operating on data sets up to 80G in size of uncompressed JSON,
   66 million records in the largest one.
   - Sometimes we're joining those large data sets, but cardinality never
   exceeds 66 million (unless we've got a bug somewhere).
   - We're seeing various OOM problems: sometimes python takes all
   available mem, sometimes we OOM with no heap space left, and occasionally
   OOM with GC overhead limit exceeded.
   - Sometimes we also see what looks like a single huge message sent over
   the wire that exceeds the wire format limitations.
   - Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It
   seems like we should have more than enough to do this comfortably.
   - We're trying to isolate specific steps now, but every time it errors,
   we're partitioning (i.e. partitionByKey is involved somewhere).

We've been instrumneting according to the monitoring and tuning docs, but a
bit at a loss for where we're going wrong. We suspect poor/wrong
partitioning on our part somehow. With that in mind, some questions:

   - How exactly is partitioning information propagated? It looks like
   within a pipelined RDD the parent partitioning is preserved throughout
   unless we either specifically repartition or go through a reduce. We're
   splitting as much as we can on maps and letting reduces happen normally. Is
   that good practice?
   - When doing e.g. partitionByKey, does an entire partition get sent to
   one worker process?
   - When does Spark stream data? Are there easy ways to sabotage the
   streaming? Are there any knobs for us to twiddle here?
   - Is there any way to specify the number of shuffles for a given reduce
   step?
   - How can we get better insight into what our workers are doing,
   specifically around moving data in and out of python land?

I realise it's hard to troubleshoot in the absence of code but any test
case we have would be contrived. We're collecting more metrics and trying
to reason about what might be happening, but any guidance at this point
would be most helpful.

Thanks!

-- 
Aaron Olson
Data Engineer, Shopify


Spark usage patterns and questions

2014-03-11 Thread Sourav Chandra
Hi,

I have some questions regarding usage patterns and debugging in spark/spark
streaming.

1. What is some used design patterns of using broadcast variable? In my
application i created some and also created a scheduled task which
periodically refreshes the variables. I want to know how efficiently and in
modular way people generally achieve this?

2. Sometimes a uncaught exception in driver program/worker does not get
traced anywhere? How can we debug this?

3. In our usecase we read from Kafka, do some mapping and lastly persists
data to cassandra as well as pushes the data over remote actor for realtime
update in dashboard. I used below approaches
 - First tried to use vary naive way like stream.map(...).foreachRDD(
pushes to actor)
It does not work and stage failed saying akka exception
 - Second tried to use
akka.serialization.JavaSerilizer.withSystem(system){...} approach
 It does not work and stage failed BUT without any trace anywhere in
lofs
 - Finally did rdd.collect to collect the output into driver and then
pushes to actor
 It worked.

I would like to know is there any efficient way of achieving this sort of
usecases

4. Sometimes I see failed stages but when opened those stage details it
said stage did not start. What does this mean?

Looking forward for some interesting responses :)

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: NO SUCH METHOD EXCEPTION

2014-03-11 Thread Matei Zaharia
Since it’s from Scala, it might mean you’re running with a different version of 
Scala than you compiled Spark with. Spark 0.8 and earlier use Scala 2.9, while 
Spark 0.9 uses Scala 2.10.

Matei

On Mar 11, 2014, at 8:19 AM, Jeyaraj, Arockia R (Arockia) 
arockia.r.jeya...@verizon.com wrote:

 Hi,
  
 Can anyone help me to resolve this issue? Why am I getting NoSuchMethod 
 exception?
  
 14/03/11 09:56:11 ERROR executor.Executor: Exception in task ID 0
 java.lang.NoSuchMethodError: 
 scala.Predef$.augmentString(Ljava/lang/String;)Lsca
 la/collection/immutable/StringOps;
 at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s
 cala:75)
 at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58
 )
 at kafka.utils.ZKConfig.init(ZkUtils.scala:837)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77)
 at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr
 eam.scala:98)
 at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput
 DStream.scala:126)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:173)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:169)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc
 V$sp(Executor.scala:213)
 at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca
 la:49)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
 utor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
 .java:908)
 at java.lang.Thread.run(Thread.java:619)
 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
 14/03/11 09:56:11 WARN scheduler.TaskSetManager: Loss was due to 
 java.lang.NoSuc
 hMethodError
 java.lang.NoSuchMethodError: 
 scala.Predef$.augmentString(Ljava/lang/String;)Lsca
 la/collection/immutable/StringOps;
 at kafka.utils.VerifiableProperties.getIntInRange(VerifiableProperties.s
 cala:75)
 at kafka.utils.VerifiableProperties.getInt(VerifiableProperties.scala:58
 )
 at kafka.utils.ZKConfig.init(ZkUtils.scala:837)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:73)
 at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:77)
 at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStr
 eam.scala:98)
 at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInput
 DStream.scala:126)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:173)
 at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExec
 utor$$anonfun$8.apply(NetworkInputTracker.scala:169)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.sc
 ala:884)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mc
 V$sp(Executor.scala:213)
 at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.sca
 la:49)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
 utor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
 .java:908)
 at java.lang.Thread.run(Thread.java:619)
 14/03/11 09:56:11 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; 
 abo
 rting job
 14/03/11 09:56:11 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from 
 pool
 
 14/03/11 09:56:11 INFO scheduler.DAGScheduler: Failed to run runJob at 
 NetworkIn
 putTracker.scala:182
 [error] (Thread-34) org.apache.spark.SparkException: Job aborted: Task 0.0:0 
 fai
 
  
  
 Thanks
 Arockia Raja



Re: Powered By Spark Page -- Companies Organizations

2014-03-11 Thread Matei Zaharia
Thanks, added you.

On Mar 11, 2014, at 2:47 AM, Christoph Böhm listenbru...@gmx.net wrote:

 Dear Spark team,
 
 thanks for the great work and congrats on becoming an Apache top-level 
 project!
 
 You could add us to your Powered-by-page, because we are using Spark (and 
 Shark) to perform interactive exploration of large datasets.
 Find us on: www.bakdata.com
 
 Best,
 Christoph
 
 -
 Christoph Böhm
 
 bakdata | bespoke data engineering 
 www.bakdata.com



Re: Pyspark Memory Woes

2014-03-11 Thread Sandy Ryza
Hi Aaron,

When you say Java heap space is 1.5G per worker, 24 or 32 cores across 46
nodes. It seems like we should have more than enough to do this
comfortably., how are you configuring this?

-Sandy


On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson aaron.ol...@shopify.comwrote:

 Dear Sparkians,

 We are working on a system to do relational modeling on top of Spark, all
 done in pyspark. While we've been learning a lot about Spark internals so
 far, we're currently running into memory issues and wondering how best to
 profile to fix them. Here are our symptoms:

- We're operating on data sets up to 80G in size of uncompressed JSON,
66 million records in the largest one.
- Sometimes we're joining those large data sets, but cardinality never
exceeds 66 million (unless we've got a bug somewhere).
- We're seeing various OOM problems: sometimes python takes all
available mem, sometimes we OOM with no heap space left, and occasionally
OOM with GC overhead limit exceeded.
- Sometimes we also see what looks like a single huge message sent
over the wire that exceeds the wire format limitations.
- Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes.
It seems like we should have more than enough to do this comfortably.
- We're trying to isolate specific steps now, but every time it
errors, we're partitioning (i.e. partitionByKey is involved somewhere).

 We've been instrumneting according to the monitoring and tuning docs, but
 a bit at a loss for where we're going wrong. We suspect poor/wrong
 partitioning on our part somehow. With that in mind, some questions:

- How exactly is partitioning information propagated? It looks like
within a pipelined RDD the parent partitioning is preserved throughout
unless we either specifically repartition or go through a reduce. We're
splitting as much as we can on maps and letting reduces happen normally. Is
that good practice?
- When doing e.g. partitionByKey, does an entire partition get sent to
one worker process?
- When does Spark stream data? Are there easy ways to sabotage the
streaming? Are there any knobs for us to twiddle here?
- Is there any way to specify the number of shuffles for a given
reduce step?
- How can we get better insight into what our workers are doing,
specifically around moving data in and out of python land?

 I realise it's hard to troubleshoot in the absence of code but any test
 case we have would be contrived. We're collecting more metrics and trying
 to reason about what might be happening, but any guidance at this point
 would be most helpful.

 Thanks!

 --
 Aaron Olson
 Data Engineer, Shopify



is spark.cleaner.ttl safe?

2014-03-11 Thread Michael Allman

Hello,

I've been trying to run an iterative spark job that spills 1+ GB to disk 
per iteration on a system with limited disk space. I believe there's 
enough space if spark would clean up unused data from previous iterations, 
but as it stands the number of iterations I can run is limited by 
available disk space.


I found a thread on the usage of spark.cleaner.ttl on the old Spark Users 
Google group here:


https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4

I think this setting may be what I'm looking for, however the cleaner 
seems to delete data that's still in use. The effect is I get bizarre 
exceptions from Spark complaining about missing broadcast data or 
ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it 
supposed to delete in-use data or is this a bug/shortcoming?


Cheers,

Michael




RE: unsubscribe

2014-03-11 Thread Kapil Malik
Ohh !

I thought you're unsubscribing :)



Kapil Malik | kma...@adobe.com | 33430 / 8800836581



-Original Message-
From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: 12 March 2014 00:51
To: user@spark.apache.org
Subject: Re: unsubscribe



To unsubscribe from this list, please send a message to 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org and 
it will automatically unsubscribe you.



Matei







On Mar 11, 2014, at 12:15 PM, Abhishek Pratap 
apra...@sagebase.orgmailto:apra...@sagebase.org wrote:








Re: Pyspark Memory Woes

2014-03-11 Thread Sandy Ryza
Are you aware that you get an executor (and the 1.5GB) per machine, not per
core?



On Tue, Mar 11, 2014 at 12:52 PM, Aaron Olson aaron.ol...@shopify.comwrote:

 Hi Sandy,

 We're configuring that with the JAVA_OPTS environment variable in
 $SPARK_HOME/spark-worker-env.sh like this:

 # JAVA OPTS
 export SPARK_JAVA_OPTS=-Dspark.ui.port=0 -Dspark.default.parallelism=1024
 -Dspark.cores.max=256 -Dspark.executor.memory=1500m
 -Dspark.worker.timeout=500 -Dspark.akka.timeout=500 

 Does that value seem low to you?

 -Aaron




 On Tue, Mar 11, 2014 at 3:08 PM, Sandy Ryza sandy.r...@cloudera.comwrote:

 Hi Aaron,

 When you say Java heap space is 1.5G per worker, 24 or 32 cores across
 46 nodes. It seems like we should have more than enough to do this
 comfortably., how are you configuring this?

 -Sandy


 On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson aaron.ol...@shopify.comwrote:

 Dear Sparkians,

 We are working on a system to do relational modeling on top of Spark,
 all done in pyspark. While we've been learning a lot about Spark internals
 so far, we're currently running into memory issues and wondering how best
 to profile to fix them. Here are our symptoms:

- We're operating on data sets up to 80G in size of uncompressed
JSON, 66 million records in the largest one.
- Sometimes we're joining those large data sets, but cardinality
never exceeds 66 million (unless we've got a bug somewhere).
- We're seeing various OOM problems: sometimes python takes all
available mem, sometimes we OOM with no heap space left, and occasionally
OOM with GC overhead limit exceeded.
- Sometimes we also see what looks like a single huge message sent
over the wire that exceeds the wire format limitations.
- Java heap space is 1.5G per worker, 24 or 32 cores across 46
nodes. It seems like we should have more than enough to do this 
 comfortably.
- We're trying to isolate specific steps now, but every time it
errors, we're partitioning (i.e. partitionByKey is involved somewhere).

 We've been instrumneting according to the monitoring and tuning docs,
 but a bit at a loss for where we're going wrong. We suspect poor/wrong
 partitioning on our part somehow. With that in mind, some questions:

- How exactly is partitioning information propagated? It looks like
within a pipelined RDD the parent partitioning is preserved throughout
unless we either specifically repartition or go through a reduce. We're
splitting as much as we can on maps and letting reduces happen normally. 
 Is
that good practice?
- When doing e.g. partitionByKey, does an entire partition get sent
to one worker process?
- When does Spark stream data? Are there easy ways to sabotage the
streaming? Are there any knobs for us to twiddle here?
- Is there any way to specify the number of shuffles for a given
reduce step?
- How can we get better insight into what our workers are doing,
specifically around moving data in and out of python land?

 I realise it's hard to troubleshoot in the absence of code but any test
 case we have would be contrived. We're collecting more metrics and trying
 to reason about what might be happening, but any guidance at this point
 would be most helpful.

 Thanks!

 --
 Aaron Olson
 Data Engineer, Shopify





 --
 Aaron Olson
 Data Engineer, Shopify



Re: Out of memory on large RDDs

2014-03-11 Thread Grega Kespret
 Your input data read as RDD may be causing OOM, so thats where you can use 
 different memory configuration. 

We are not getting any OOM exceptions, just akka future timeouts in 
mapoutputtracker and unsuccessful get of shuffle outputs, therefore refetching 
them. 

What is the industry practice when going about debugging such errors? 

Questions:
- why are mapoutputtrackers timing out? ( and how to debug this properly?)
- what is the task/purpose of mapoutputtracker?
- how to check per-task objects size?

Thanks,
Grega

 On 11 Mar 2014, at 18:43, Mayur Rustagi mayur.rust...@gmail.com wrote:
 
 Shuffle data is always stored on disk, its unlikely to cause OOM. Your input 
 data read as RDD may be causing OOM, so thats where you can use different 
 memory configuration. 
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Tue, Mar 11, 2014 at 9:20 AM, sparrow do...@celtra.com wrote:
 I don't understand how exactly will that help. There are no persisted RDD's 
 in storage. Our input data is ~ 100GB, but output of the flatMap is ~40Mb. 
 The small RDD is then persisted. 
 
 Memory configuration should not affect shuffle data if I understand you 
 correctly?
 
 
 
 
 
 
 
 On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User List] 
 [hidden email] wrote:
 Shuffle data is not kept in memory. Did you try additional memory 
 configurations( 
 https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence)
  
 
 Mayur Rustagi
 Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 
 target=_blank+1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec [hidden email] wrote:
 Hi
 
 I have a spark cluster with 4 workers each with 13GB ram. I would like to 
 process a large data set (does not fit in memory) that consists of JSON 
 entries. These are the transformations applied:
 
 SparkContext.textFile(s3url). // read files from s3
 keyBy(_.parseJson.id) // key by id that is located in json string
 groupByKey(number_of_group_tasks) //group by id
 flatMap(case (key,lines) = { //do some stuff })
 
 In the web view I can see a key by operation doing a shuffle write. If I 
 understand correctly the groupByKey transformation creates a wide RDD 
 dependency thus requiring a shuffle write. I have already increased 
 spark.akka.askTimeout to 30 seconds and still job fails with errors on 
 workers:
 
 Error communicating with MapOutputTracker
 at 
 org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
 at 
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
 at 
 org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
 at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
 at 
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
 at 
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
 at 
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at 
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
 at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
 at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
 at 
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
 [3] milliseconds
 at akka.dispatch.DefaultPromise.ready(Future.scala:870)
 at akka.dispatch.DefaultPromise.result(Future.scala:874)
 at akka.dispatch.Await$.result(Future.scala:74)
 at 
 org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)

possible bug in Spark's ALS implementation...

2014-03-11 Thread Michael Allman

Hi,

I'm implementing a recommender based on the algorithm described in 
http://www2.research.att.com/~yifanhu/PUB/cf.pdf. This algorithm forms the 
basis for Spark's ALS implementation for data sets with implicit features. 
The data set I'm working with is proprietary and I cannot share it, 
however I can say that it's based on the same kind of data in the 
paper---relative viewing time of videos. (Specifically, the rating for 
each video is defined as total viewing time across all visitors divided by 
video duration).


I'm seeing counterintuitive, sometimes nonsensical recommendations. For 
comparison, I've run the training data through Oryx's in-VM implementation 
of implicit ALS with the same parameters. Oryx uses the same algorithm. 
(Source in this file: 
https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java)


The recommendations made by each system compared to one other are very 
different---moreso than I think could be explained by differences in 
initial state. The recommendations made by the Oryx models look much 
better, especially as I increase the number of latent factors and the 
iterations. The Spark models' recommendations don't improve with increases 
in either latent factors or iterations. Sometimes, they get worse.


Because of the (understandably) highly-optimized and terse style of 
Spark's ALS implementation, I've had a very hard time following it well 
enough to debug the issue definitively. However, I have found a section of 
code that looks incorrect. As described in the paper, part of the implicit 
ALS algorithm involves computing a matrix product YtCuY (equation 4 in the 
paper). To optimize this computation, this expression is rewritten as YtY 
+ Yt(Cu - I)Y. I believe that's what should be happening here:


https://github.com/apache/incubator-spark/blob/v0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L376

However, it looks like this code is in fact computing YtY + YtY(Cu - I), 
which is the same as YtYCu. If so, that's a bug. Can someone familiar with 
this code evaluate my claim?


Cheers,

Michael


Re: possible bug in Spark's ALS implementation...

2014-03-11 Thread Xiangrui Meng
Hi Michael,

I can help check the current implementation. Would you please go to
https://spark-project.atlassian.net/browse/SPARK and create a ticket
about this issue with component MLlib? Thanks!

Best,
Xiangrui

On Tue, Mar 11, 2014 at 3:18 PM, Michael Allman m...@allman.ms wrote:
 Hi,

 I'm implementing a recommender based on the algorithm described in
 http://www2.research.att.com/~yifanhu/PUB/cf.pdf. This algorithm forms the
 basis for Spark's ALS implementation for data sets with implicit features.
 The data set I'm working with is proprietary and I cannot share it, however
 I can say that it's based on the same kind of data in the paper---relative
 viewing time of videos. (Specifically, the rating for each video is
 defined as total viewing time across all visitors divided by video
 duration).

 I'm seeing counterintuitive, sometimes nonsensical recommendations. For
 comparison, I've run the training data through Oryx's in-VM implementation
 of implicit ALS with the same parameters. Oryx uses the same algorithm.
 (Source in this file:
 https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java)

 The recommendations made by each system compared to one other are very
 different---moreso than I think could be explained by differences in initial
 state. The recommendations made by the Oryx models look much better,
 especially as I increase the number of latent factors and the iterations.
 The Spark models' recommendations don't improve with increases in either
 latent factors or iterations. Sometimes, they get worse.

 Because of the (understandably) highly-optimized and terse style of Spark's
 ALS implementation, I've had a very hard time following it well enough to
 debug the issue definitively. However, I have found a section of code that
 looks incorrect. As described in the paper, part of the implicit ALS
 algorithm involves computing a matrix product YtCuY (equation 4 in the
 paper). To optimize this computation, this expression is rewritten as YtY +
 Yt(Cu - I)Y. I believe that's what should be happening here:

 https://github.com/apache/incubator-spark/blob/v0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L376

 However, it looks like this code is in fact computing YtY + YtY(Cu - I),
 which is the same as YtYCu. If so, that's a bug. Can someone familiar with
 this code evaluate my claim?

 Cheers,

 Michael


Re: How to create RDD from Java in-memory data?

2014-03-11 Thread wallacemann
Ah!  Thank you.  That'll work for now.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-RDD-from-Java-in-memory-data-tp2486p2570.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Applications for Spark on HDFS

2014-03-11 Thread Sandy Ryza
Hi Paul,

What do you mean by distributing the jars manually?  If you register jars
that are local to the client with SparkContext.addJars, Spark should handle
distributing them to the workers.  Are you taking advantage of this?

-Sandy


On Tue, Mar 11, 2014 at 3:09 PM, Paul Schooss paulmscho...@gmail.comwrote:

 Hello Folks,

 I was wondering if anyone had experience placing application jars for
 Spark onto HDFS. Currently I have distributing the jars manually and would
 love to source the jar via HDFS a la distributed caching with MR. Any
 ideas?

 Regards,

 Paul



Re: How to create RDD from Java in-memory data?

2014-03-11 Thread wallacemann
In a similar vein, it would be helpful to have an Iterable way to access the
data inside an RDD.  The collect method takes everything in the RDD and puts
in a list, but this blows up memory.  Since everything I want is already
inside the RDD, it could be easy to iterate over the content without
replicating the array.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-RDD-from-Java-in-memory-data-tp2486p2568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to create RDD from Java in-memory data?

2014-03-11 Thread Mark Hamstra
https://github.com/apache/incubator-spark/pull/421

Works pretty good, but really needs to be enhanced to work with
AsyncRDDActions.


On Tue, Mar 11, 2014 at 4:50 PM, wallacemann wall...@bandpage.com wrote:

 In a similar vein, it would be helpful to have an Iterable way to access
 the
 data inside an RDD.  The collect method takes everything in the RDD and
 puts
 in a list, but this blows up memory.  Since everything I want is already
 inside the RDD, it could be easy to iterate over the content without
 replicating the array.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-RDD-from-Java-in-memory-data-tp2486p2568.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: RDD.saveAs...

2014-03-11 Thread Matei Zaharia
I agree that we can’t keep adding these to the core API, partly because it will 
get unwieldy to maintain and partly just because each storage system will bring 
in lots of dependencies. We can simply have helper classes in different modules 
for each storage system. There’s some discussion on this at 
https://spark-project.atlassian.net/browse/SPARK-1127.

Matei

On Mar 11, 2014, at 9:06 AM, Koert Kuipers ko...@tresata.com wrote:

 I find the current design to write RDDs to disk (or a database, etc) kind of 
 ugly. It will lead to a proliferation of saveAs methods. A better abstraction 
 would be nice (perhaps a Sink trait to write to)
 



Re: Block

2014-03-11 Thread dachuan
In my opinion, BlockManager manages many types of Block, RDD's partition,
a.k.a. RDDBlock, is one type of them. Other types of Blocks are
ShuffleBlock, IndirectBlock (if the task's return status is too large), etc.

So, BlockManager is a layer that is independent of RDD concept.
On Mar 11, 2014 2:06 AM, David Thomas dt5434...@gmail.com wrote:

 What is the concept of Block and BlockManager in Spark? How is a Block
 related to a Partition of a RDD?



Re: Are all transformations lazy?

2014-03-11 Thread Ewen Cheslack-Postava
You should probably be 
asking the opposite question: why do you think it *should* be applied 
immediately? Since the driver program hasn't requested any data back 
(distinct generates a new RDD, it doesn't return any data), there's no 
need to actually compute anything yet.

As the documentation describes, if the call returns an RDD, it's 
transforming the data and will just keep track of the operation it 
eventually needs to perform. Only methods that return data back to the 
driver should trigger any computation.

(The one known exception is sortByKey, which really should be lazy, but 
apparently uses an RDD.count call in its implementation: 
https://spark-project.atlassian.net/browse/SPARK-1021).


   	   
   	David Thomas  
  March 11, 2014 at
 9:49 PM
  For example, is 
distinct() transformation lazy? when I see the Spark source 
code, distinct applies a map- reduceByKey - map function to the 
RDD elements. Why is this lazy? Won't the function be applied 
immediately to the elements of RDD when I call someRDD.distinct?
 /** * Return a new RDD containing the distinct elements in 
this RDD. */ def distinct(numPartitions: Int): RDD[T] =
 map(x = (x, null)).reduceByKey((x, y) = x, 
numPartitions).map(_._1)
 /** * Return a new RDD containing the distinct elements in 
this RDD. */ def distinct(): RDD[T] = 
distinct(partitions.size)

  




Re: Are all transformations lazy?

2014-03-11 Thread David Thomas
I think you misunderstood my question - I should have stated it better. I'm
not saying it should be applied immediately, but I'm trying to understand
how Spark achieves this lazy computation transformations. May be this is
due to my ignorance of how Scala works, but when I see the code, I see that
the function is applied to the elements of RDD when I call distinct - or is
it not applied immediately? How does the returned RDD 'keep track of the
operation'?


On Tue, Mar 11, 2014 at 10:06 PM, Ewen Cheslack-Postava m...@ewencp.orgwrote:

 You should probably be asking the opposite question: why do you think it
 *should* be applied immediately? Since the driver program hasn't requested
 any data back (distinct generates a new RDD, it doesn't return any data),
 there's no need to actually compute anything yet.

 As the documentation describes, if the call returns an RDD, it's
 transforming the data and will just keep track of the operation it
 eventually needs to perform. Only methods that return data back to the
 driver should trigger any computation.

 (The one known exception is sortByKey, which really should be lazy, but
 apparently uses an RDD.count call in its implementation:
 https://spark-project.atlassian.net/browse/SPARK-1021).

   David Thomas dt5434...@gmail.com
  March 11, 2014 at 9:49 PM
 For example, is distinct() transformation lazy?

 when I see the Spark source code, distinct applies a map- reduceByKey -
 map function to the RDD elements. Why is this lazy? Won't the function be
 applied immediately to the elements of RDD when I call someRDD.distinct?

   /**
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int): RDD[T] =
 map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1)

   /**
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(): RDD[T] = distinct(partitions.size)


inline: compose-unknown-contact.jpg