Re: Lost an executor error - Jobs fail

2014-04-15 Thread Aaron Davidson
Hmm, interesting. I created
https://issues.apache.org/jira/browse/SPARK-1499to track the issue of
Workers continuously spewing bad executors, but the
real issue seems to be a combination of that and some other bug in Shark or
Spark which fails to handle the situation properly.

Please let us know if you can reproduce it (especially if deterministic!),
or if you can provide any more details about exceptions thrown. A
preliminary search didn't bring up much about the error code 101...


On Mon, Apr 14, 2014 at 10:03 PM, Praveen R prav...@sigmoidanalytics.comwrote:

 Unfortunately queries kept failing with SparkTask101 errors and had them
 working after removing the troublesome node.

 FAILED: Execution Error, return code -101 from shark.execution.SparkTask

 I wish it would have been easy to re-produce it. I shall give a try to
 hard remove write permissions on one node to see if the same error happens.



 On Tue, Apr 15, 2014 at 9:17 AM, Aaron Davidson ilike...@gmail.comwrote:

 Cool! It's pretty rare to actually get logs from a wild hardware failure.
 The problem is as you said, that the executor keeps failing, but the worker
 doesn't get the hint, so it keeps creating new, bad executors.

 However, this issue should not have caused your cluster to fail to start
 up. In the linked logs, for instance, the shark shell started up just fine
 (though the shark was lost in some of the log messages). Queries should
 have been able to execute just fine. Was this not the case?


 On Mon, Apr 14, 2014 at 7:38 AM, Praveen R 
 prav...@sigmoidanalytics.comwrote:

 Configuration comes from spark-ec2 setup script, sets spark.local.dir to
 use /mnt/spark, /mnt2/spark.
  Setup actually worked for quite sometime and then on one of the node
 there were some disk errors as

 mv: cannot remove
 `/mnt2/spark/spark-local-20140409182103-c775/09/shuffle_1_248_0': Read-only
 file system
 mv: cannot remove
 `/mnt2/spark/spark-local-20140409182103-c775/24/shuffle_1_260_0': Read-only
 file system
 mv: cannot remove
 `/mnt2/spark/spark-local-20140409182103-c775/24/shuffle_2_658_0': Read-only
 file system

 I understand the issue is hardware level but thought it would be great
 if spark could handle it and avoid cluster going down.


 On Mon, Apr 14, 2014 at 7:58 PM, giive chen thegi...@gmail.com wrote:

 Hi Praveen

 What is your config about * spark.local.dir ? *
 Is all your worker has this dir and all worker has right permission on
 this dir?

 I think this is the reason of your error

 Wisely Chen


 On Mon, Apr 14, 2014 at 9:29 PM, Praveen R 
 prav...@sigmoidanalytics.com wrote:

 Had below error while running shark queries on 30 node cluster and was
 not able to start shark server or run any jobs.

 *14/04/11 19:06:52 ERROR scheduler.TaskSchedulerImpl: Lost an executor
 4 (already removed): Failed to create local directory (bad
 spark.local.dir?)*
 *Full log: *https://gist.github.com/praveenr019/10647049

 After spending quite some time, found it was due to disk read errors
 on one node and had the cluster working after removing the node.

 Wanted to know if there is any configuration (like akkaTimeout) which
 can handle this or does mesos help ?

 Shouldn't the worker be marked dead in such scenario, instead of
 making the cluster non-usable so the debugging can be done at leisure.

 Thanks,
 Praveen R









Re: storage.MemoryStore estimated size 7 times larger than real

2014-04-15 Thread Aaron Davidson
Hey, I was talking about something more like:

val size = 1024 * 1024
val numSlices = 8
val arr = Array.fill[Array[Int]](numSlices) { new Array[Int](size /
numSlices) }
val rdd = sc.parallelize(arr, numSlices).cache()
val size2 = rdd.map(_.length).sum()
assert( size2 == size )

If I do this, I see 8 blocks are put into MemoryStore, each with a size of
512.1 KB, which adds up to almost exactly 4MB as expected.

Regarding your other questions:
Non-cached RDDs are not written back to disk, their results are simply not
stored anywhere. If the results are needed again, the RDD will be
recomputed. I'm not sure I understand your distinction between JVM and
Spark memory -- both arrays and cached RDDs are stored in the JVM heap.

Shuffle operations are unique in that they store intermediate output to
local disk immediately, in order to avoid overly expensive recomputation.
This shuffle data is always written to disk, whether or not the input
RDD(s) are cached, and the final output of the shuffle (the groupBy in your
example) will *not* be cached in memory unless explicitly requested.



On Mon, Apr 14, 2014 at 8:48 PM, wxhsdp wxh...@gmail.com wrote:

 thanks for your help,  Davidson!
 i modified
 val a:RDD[Int] = sc.parallelize(array).cache()
 to keep val a an RDD of Int, but has the same result

 another question
 JVM and spark memory locate at different parts of system memory, the spark
 code is executed in JVM memory, malloc operation like val e = new
 Array[Int](2*size) /*8MB*/ use JVM memory. if not cached, generated RDDs
 are
 writed back to disk, if cached, RDDs are copied to spark memory for further
 use, is that
 right?

 val RDD_1 = RDD_0.groupByKey{...}
 shuffle separate stages, can anyone tell me the memory/disk usage of
 shuffle
 input  RDD and shuffle output RDD under the condition that RDD_0, RDD_1 is
 cached or not?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4256.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



groupByKey returns a single partition in a RDD?

2014-04-15 Thread Joe L
I want to apply the following transformations to 60Gbyte data on 7nodes with
10Gbyte memory. And I am wondering if groupByKey() function returns a RDD
with a single partition for each key? if so, what will happen if the size of
the partition doesn't fit into that particular node? 

rdd = sc.textFile(hdfs//.).map(parserFunc).groupByKey()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupByKey-returns-a-single-partition-in-a-RDD-tp4264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Comparing GraphX and GraphLab

2014-04-15 Thread Qi Song
Hi Debasish,
I found PageRank LiveJournal cost less than 100 seconds for GraphX in your
EC2. But as I use the example (LiveJournalPageRank) you provided in my
mechines with the same LiveJournal dataset, It took more than 10 minutes.
Following are some details:

Environment: 8 machines with each 2*Intel Xeon E5-2650 CPU、256GB memory、6TB
hard disk+480GB SSD, Infiniband, Debian Wheezy OS.
I use this order: /./bin/run-example
org.apache.spark.examples.graphx.LiveJournalPageRank local
hdfs://10.1.1.33:9000/dataset/LiveJournal.txt
/

Should I set more params to get a faster result?
Moreover, I want to know the default allocation of computing resources, as
run-example may not allow me to allocate them by myself.

Regards~
Qi Song



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Comparing-GraphX-and-GraphLab-tp3112p4265.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to stop system info output in spark shell

2014-04-15 Thread Wei Da
The solution:
Edit /opt/spark-0.9.0-incubating-bin-hadoop2/conf/log4j.properties, changing
Spark's output to WARN. Done!

Refer to:
https://github.com/amplab-extras/SparkR-pkg/blob/master/pkg/src/src/main/resources/log4j.properties#L8




eduardocalfaia wrote
 Have you already tried in conf/log4j.properties?
 log4j.rootCategory=OFF
 
 Em 4/3/14, 13:46, weida xu escreveu:
 Hi, alll

 When I start spark in the shell. It automatically output some system 
 info every minute, see below. Can I stop or block the output of these 
 info? I tried the :silent comnond, but the automatical output remains.

 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for 
 SHUFFLE_BLOCK_MANAGER
 14/04/03 19:34:30 INFO BlockManager: Dropping non broadcast blocks 
 older than 1396524270698
 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for 
 BLOCK_MANAGER
 14/04/03 19:34:30 INFO BlockManager: Dropping broadcast blocks older 
 than 1396524270701
 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for 
 BROADCAST_VARS
 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for 
 HTTP_BROADCAST
 14/04/03 19:34:30 INFO MetadataCleaner: Ran metadata cleaner for 
 MAP_OUTPUT_TRACKER
 14/04/03 19:34:31 INFO MetadataCleaner: Ran metadata cleaner for 
 SPARK_CONTEXT
 14/04/03 19:34:31 INFO DAGScheduler: shuffleToMapStage 0 -- 0
 14/04/03 19:34:31 INFO DAGScheduler: stageIdToStage 0 -- 0
 14/04/03 19:34:31 INFO DAGScheduler: stageIdToJobIds 0 -- 0
 14/04/03 19:34:31 INFO DAGScheduler: pendingTasks 0 -- 0
 14/04/03 19:34:31 INFO DAGScheduler: jobIdToStageIds 0 -- 0
 14/04/03 19:34:31 INFO DAGScheduler: stageToInfos 0 -- 0
 14/04/03 19:34:31 INFO MetadataCleaner: Ran metadata cleaner for 
 DAG_SCHEDULER



 
 
 -- 
 Informativa sulla Privacy: http://www.unibs.it/node/8155





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stop-system-info-output-in-spark-shell-tp3704p4266.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark program thows OutOfMemoryError

2014-04-15 Thread Qin Wei
Hi, all

My spark program always gives me the error java.lang.OutOfMemoryError: Java
heap space in my standalone cluster, here is my code:

object SimCalcuTotal { 
def main(args: Array[String]) { 
val sc = new SparkContext(spark://192.168.2.184:7077, Sim Calcu
Total, /usr/local/spark-0.9.0-incubating-bin-hadoop2,
Seq(/home/deployer/score-calcu-assembly-1.0.jar)) 
// val sc = new SparkContext(local, Score Calcu Total) 

val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) 
val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) 

val newRDD = jsonRDD.map(arg = { 
// 0.5 for test 
var score = 0.5 
arg.put(score, score) 
arg 
}) 

val resourcesRDD = jsonRDD.map(arg =
arg.get(rid).toString.toLong).distinct 

// the program crashes at this line of code 
val bcResources = sc.broadcast(resourcesRDD.collect.toList) 
val resourceScoresRDD = newRDD.map(arg =
(arg.get(rid).toString.toLong, (arg.get(zid).toString,
arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey() 
val resouceScores = sc.broadcast(resourceScoresRDD.collect.toMap) 

def calSim(item1 : Long, item2 : Long) = { 
val iv1 = resouceScores.value(item1) 
val iv2 = resouceScores.value(item2) 

// 0.5 for test 
var distance = 0.5 
if(distance  0.05){ 
var json = new JSONObject() 
json.put(_id, item1.toString + item2.toString) 
json.put(rid1, item1) 
json.put(rid2, item2) 
json.put(sim, distance) 
json 
} 
else null 
} 

//val saveRDD = newRDD.map(arg = arg.toString) 
//newRDD.saveAsTextFile(args(1).toString) 
val similarityRDD = resourcesRDD.flatMap(resource = { for(other -
bcResources.value if resource  other) yield calSim(resource,
other)}).filter(arg = arg != null) 
similarityRDD.saveAsTextFile(/home/deployer/sim) 
} 
}

The data file “/home/deployer/uris.dat” is 2G  with lines like this : {
id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 }

And here is my spark-env.sh
export SCALA_HOME=/usr/local/scala-2.10.3
export SPARK_MASTER_IP=192.168.2.184
export SPARK_MASTER_PORT=7077
export SPARK_LOCAL_IP=192.168.2.182
export SPARK_WORKER_MEMORY=20g
export SPARK_MEM=10g
export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g
-XX:-UseGCOverheadLimit

There are two processes on my server when the spark program is
running(before it crashes): 
java -cp
:/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms4g -Xmx40g
-XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@192.168.2.183:51339/user/CoarseGrainedScheduler 0
192.168.2.182 16 akka.tcp://sparkWorker@192.168.2.182:45588/user/Worker
app-20140415172433-0001 

java -cp
:/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.worker.Worker spark://192.168.2.184:7077

Is there anybody who can help me? Thanks very much!!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-tp4268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


standalone vs YARN

2014-04-15 Thread ishaaq
Hi all,
I am evaluating Spark to use here at my work.

We have an existing Hadoop 1.x install which I planning to upgrade to Hadoop
2.3.

I am trying to work out whether I should install YARN or simply just setup a
Spark standalone cluster. We already use ZooKeeper so it isn't a problem to
setup HA. I am puzzled however as to how the Spark nodes can coordinate on
data locality - i.e., assuming I install the nodes on the same machines as
the DFS data nodes, I don't understand how Spark can work out which nodes
should get which splits of the jobs?

Anyway, my bigger question remains: YARN or standalone? Which is the more
stable option currently? Which is the more future-proof option?

Thanks,
Ishaaq 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/standalone-vs-YARN-tp4271.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: standalone vs YARN

2014-04-15 Thread Surendranauth Hiraman
Prashant,

In another email thread several weeks ago, it was mentioned that YARN
support is considered beta until Spark 1.0. Is that not the case?

-Suren



On Tue, Apr 15, 2014 at 8:38 AM, Prashant Sharma scrapco...@gmail.comwrote:

 Hi Ishaaq,

 answers inline from what I know, I had like to be corrected though.

 On Tue, Apr 15, 2014 at 5:58 PM, ishaaq ish...@gmail.com wrote:

 Hi all,
 I am evaluating Spark to use here at my work.

 We have an existing Hadoop 1.x install which I planning to upgrade to
 Hadoop
 2.3.

 This is not really a requirement for spark, if you are doing for some
 other reason great !


 I am trying to work out whether I should install YARN or simply just
 setup a
 Spark standalone cluster. We already use ZooKeeper so it isn't a problem
 to
 setup HA. I am puzzled however as to how the Spark nodes can coordinate on
 data locality - i.e., assuming I install the nodes on the same machines as
 the DFS data nodes, I don't understand how Spark can work out which nodes
 should get which splits of the jobs?

 This happens exactly the same way hadoop's mapreduce figures out data
 locality. Since we support hadoop's inputformats(which also has the
 information on how data is partitioned) etc. So having spark workers share
 the same nodes as your DFS is a good idea.


 Anyway, my bigger question remains: YARN or standalone? Which is the more
 stable option currently? Which is the more future-proof option?


 Well I think standalone is stable enough for all purposes and Spark's yarn
 support has been keeping up with latest hadoop versions too. It depends on
 the fact that if you are already using yarn and don't want the hassle of
 setting up another cluster manager you can probably prefer yarn.


 Thanks,
 Ishaaq



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/standalone-vs-YARN-tp4271.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Scala vs Python performance differences

2014-04-15 Thread Ian Ferreira
This would be super useful. Thanks.

On 4/15/14, 1:30 AM, Jeremy Freeman freeman.jer...@gmail.com wrote:

Hi Andrew,

I'm putting together some benchmarks for PySpark vs Scala. I'm focusing on
ML algorithms, as I'm particularly curious about the relative performance
of
MLlib in Scala vs the Python MLlib API vs pure Python implementations.

Will share real results as soon as I have them, but roughly, in our hands,
that 40% number is ballpark correct, at least for some basic operations
(e.g
textFile, count, reduce).

-- Jeremy

-
Jeremy Freeman, PhD
Neuroscientist
@thefreemanlab



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-perfor
mance-differences-tp4247p4261.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




partitioning of small data sets

2014-04-15 Thread Diana Carroll
I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb

Given the size, and that it is a single file, I assumed it would only be in
a single partition.  But when I cache it,  I can see in the Spark App UI
that it actually splits it into two partitions:

[image: Inline image 1]

Is this correct behavior?  How does Spark decide how big a partition should
be, or how many partitions to create for an RDD.

If it matters, I have only a single worker in my cluster, so both
partitions are stored on the same worker.

The file was on HDFS and was only a single block.

Thanks for any insight.

Diana
inline: sparkdev_2014-04-11.png

Re: Spark resilience

2014-04-15 Thread Manoj Samel
Thanks Aaron, this is useful !

- Manoj


On Mon, Apr 14, 2014 at 8:12 PM, Aaron Davidson ilike...@gmail.com wrote:

 Launching drivers inside the cluster was a feature added in 0.9, for
 standalone cluster mode:
 http://spark.apache.org/docs/latest/spark-standalone.html#launching-applications-inside-the-cluster

 Note the supervise flag, which will cause the driver to be restarted if
 it fails. This is a rather low-level mechanism which by default will just
 cause the whole job to rerun from the beginning. Special recovery would
 have to be implemented by hand, via some sort of state checkpointing into a
 globally visible storage system (e.g., HDFS), which, for example, Spark
 Streaming already does.

 Currently, this feature is not supported in YARN or Mesos fine-grained
 mode.


 On Mon, Apr 14, 2014 at 2:08 PM, Manoj Samel manojsamelt...@gmail.comwrote:

 Could you please elaborate how drivers can be restarted automatically ?

 Thanks,


 On Mon, Apr 14, 2014 at 10:30 AM, Aaron Davidson ilike...@gmail.comwrote:

 Master and slave are somewhat overloaded terms in the Spark ecosystem
 (see the glossary:
 http://spark.apache.org/docs/latest/cluster-overview.html#glossary).
 Are you actually asking about the Spark driver and executors, or the
 standalone cluster master and workers?

 To briefly answer for either possibility:
 (1) Drivers are not fault tolerant but can be restarted automatically,
 Executors may be removed at any point without failing the job (though
 losing an Executor may slow the job significantly), and Executors may be
 added at any point and will be immediately used.
 (2) Standalone cluster Masters are fault tolerant and failure will only
 temporarily stall new jobs from starting or getting new resources, but does
 not affect currently-running jobs. Workers can fail and will simply cause
 jobs to lose their current Executors. New Workers can be added at any point.



 On Mon, Apr 14, 2014 at 11:00 AM, Ian Ferreira 
 ianferre...@hotmail.comwrote:

 Folks,

 I was wondering what the failure support modes where for Spark while
 running jobs


1. What happens when a master fails
2. What happens when a slave fails
3. Can you mid job add and remove slaves


 Regarding the install on Meso, if I understand correctly the Spark
 master is behind a Zookeeper quorum so that isolates the slaves from a
 master failure, but what about the masters behind quorum?

 Cheers
 - Ian







Re: storage.MemoryStore estimated size 7 times larger than real

2014-04-15 Thread Aaron Davidson
Ah, I think I can see where your issue may be coming from. In spark-shell,
the MASTER is local[*], which just means it uses a pre-set number of
cores. This distinction only matters because the default number of slices
created from sc.parallelize() is based on the number of cores.

So when you run from sbt, you probably use a SparkContext with a local
master, which sets number of cores to 1, meaning you are doing
sc.parallelize(array, 1)

while in Spark Shell you are doing
sc.parallelize(array, 6ish?)

The difference between the two is just that the array is broken up into
more parts in the latter, so you will store blocks for rdd_0_0, rdd_0_1,
..., rdd_0_5 rather than just one (large) block. In both cases, though, I
suspect that the total size is around the same, at around 28 MB.

In my case, where I have an RDD[Array[Int]], I have 8 partitions (a number
I just chose randomly), and each one is 512 KB, so the total size is
actually 4 MB. You could do the same test with numSlices = 1, and you'd
just have a single 4 MB block.

The reason our two solutions produced different total memory values is
because of Java primitive boxing [1]. In your case, your RDD[Int] is
converted into an Array[Any] right before being stored into memory, which
causes it to be effectively an Array[java.lang.Integer] [2]. In my case,
the actual values inside the RDD are primitive arrays, so they cannot be
broken up. Spark still converts my RDD[Array[Int]] into an Array[Any], but
Array[Int] is already an Any, so there's no memory impact here.

[1] http://docs.oracle.com/javase/tutorial/java/data/autoboxing.html
[2]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L90



On Tue, Apr 15, 2014 at 3:58 AM, wxhsdp wxh...@gmail.com wrote:

 sorry, davidosn, i don't catch the point. what's the essential difference
 between our codes?
 /*my code*/
 val array = new Array[Int](size)
 val a = sc.parallelize(array).cache() /*4MB*/

 /*your code*/
 val numSlices = 8
 val arr = Array.fill[Array[Int]](numSlices) { new Array[Int](size /
 numSlices) }
 val rdd = sc.parallelize(arr, numSlices).cache()

 i'm in local mode, with only one partitions, it's just an RDD of one
 partition with the type RDD[Int]
 your RDD have 8 partitions with the type RDD[Array[Int]], do that matter?
 my question is why the memory usage is 7x in sbt, but right in spark shell?

 as to the following question, i made a mistake, sorry



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4269.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: partitioning of small data sets

2014-04-15 Thread Aaron Davidson
Take a look at the minSplits argument for SparkContext#textFile [1] -- the
default value is 2. You can simply set this to 1 if you'd prefer not to
split your data.

[1]
http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext


On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.comwrote:

 I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb

 Given the size, and that it is a single file, I assumed it would only be
 in a single partition.  But when I cache it,  I can see in the Spark App UI
 that it actually splits it into two partitions:

 [image: Inline image 1]

 Is this correct behavior?  How does Spark decide how big a partition
 should be, or how many partitions to create for an RDD.

 If it matters, I have only a single worker in my cluster, so both
 partitions are stored on the same worker.

 The file was on HDFS and was only a single block.

 Thanks for any insight.

 Diana



inline: sparkdev_2014-04-11.png

Why these operations are slower than the equivalent on Hadoop?

2014-04-15 Thread Yanzhe Chen
Hi all,  

As a previous thread, I am asking how to implement a divide-and-conquer 
algorithm (skyline) in Spark.
Here is my current solution:

val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble))

val result = data.mapPartitions(points = 
skyline(points.toArray).iterator).coalesce(1, true)
 .mapPartitions(points = 
skyline(points.toArray).iterator).collect()

where skyline is a local algorithm to compute the results:

def skyline(points: Array[Point]) : Array[Point]

Basically, I find this implement runs slower than the corresponding Hadoop 
version (the identity map phase plus local skyline for both combine and reduce 
phases).

Below are my questions:

1. Why this implementation is much slower than the Hadoop one?  

I can find two possible reasons: one is the shuffle overhead in coalesce, 
another is calling the toArray and iterator repeatedly when invoking local 
skyline algorithm. But I am not sure which one is true.

2. One observation is that while Hadoop version almost used up all the CPU 
resources during execution, the CPU seems not that hot on Spark. Is that a clue 
to prove that the shuffling might be the real bottleneck?

3. Is there any difference between coalesce(1, true) and reparation? It seems 
that both opeartions need shuffling data. What’s the proper situations using 
the coalesce method?

4. More generally, I am trying to implementing some core geometry computation 
operators on Spark (like skyline, convex hull etc). In my understanding, since 
Spark is more capable of handling iterative computations on dataset, the above 
solution apparently doesn’t exploit what Spark is good at. Any comments on how 
to do geometry computations on Spark (if it is possible) ?

Thanks for any insight.

Yanzhe



Streaming job having Cassandra query : OutOfMemoryError

2014-04-15 Thread sonyjv
Hi All,

I am desperately looking for some help.

My cluster is 6 nodes having dual core and 8GB ram each. Spark version
running on the cluster is spark-0.9.0-incubating-bin-cdh4.

I am getting OutOfMemoryError when running a Spark Streaming job
(non-streaming version works fine) which queries Cassandra table (simple
query returning 3-4 rows) by connecting to the Spark standalone cluster
master. 

java.lang.OutOfMemoryError: Java heap space
at
org.apache.hadoop.io.WritableUtils.readCompressedByteArray(WritableUtils.java:38)
at
org.apache.hadoop.io.WritableUtils.readCompressedString(WritableUtils.java:87)
at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:185)
at 
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
at 
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
at 
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at 
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
Apr 15, 2014 6:53:39 PM org.apache.spark.Logging$class logInfo

Spark job dependencies are 

 dependency
groupIdorg.scala-lang/groupId
artifactIdscala-library/artifactId
version2.10.3/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version0.9.0-incubating/version
/dependency 
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version0.9.0-incubating/version
/dependency
dependency
groupIdorg.apache.cassandra/groupId
artifactIdcassandra-all/artifactId
version2.0.6/version
/dependency
dependency
groupIdcom.tuplejump/groupId
artifactIdcalliope_2.10/artifactId
version0.9.0-U1-C2-EA/version
/dependency

Various memory variables are configured as below. 
spark.executor.memory = 4g
SPARK_MEM = 2g
SPARK_WORKER_MEMORY = 4g

Can you you please let me know where am I going wrong. 

Thanks,
Sony





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-job-having-Cassandra-query-OutOfMemoryError-tp4280.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: partitioning of small data sets

2014-04-15 Thread Matei Zaharia
Yup, one reason it’s 2 actually is to give people a similar experience to 
working with large files, in case their code doesn’t deal well with the file 
being partitioned.

Matei

On Apr 15, 2014, at 9:53 AM, Aaron Davidson ilike...@gmail.com wrote:

 Take a look at the minSplits argument for SparkContext#textFile [1] -- the 
 default value is 2. You can simply set this to 1 if you'd prefer not to split 
 your data.
 
 [1] 
 http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext
 
 
 On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb
 
 Given the size, and that it is a single file, I assumed it would only be in a 
 single partition.  But when I cache it,  I can see in the Spark App UI that 
 it actually splits it into two partitions:
 
 sparkdev_2014-04-11.png
 
 Is this correct behavior?  How does Spark decide how big a partition should 
 be, or how many partitions to create for an RDD.
 
 If it matters, I have only a single worker in my cluster, so both 
 partitions are stored on the same worker.
 
 The file was on HDFS and was only a single block.
 
 Thanks for any insight.
 
 Diana
 
 
 



scheduler question

2014-04-15 Thread Mohit Jaggi
Hi Folks,
I have some questions about how Spark scheduler works:
- How does Spark know how many resources a job might need?
- How does it fairly share resources between multiple jobs?
- Does it know about data and partition sizes and use that information
for scheduling?

Mohit.


Re: Why these operations are slower than the equivalent on Hadoop?

2014-04-15 Thread Cheng Lian
Your Spark solution first reduces partial results into a single partition,
computes the final result, and then collects to the driver side. This
involves a shuffle and two waves of network traffic. Instead, you can
directly collect partial results to the driver and then computes the final
results on driver side:

val data = sc.textFile(...).map(line =
line.split(,).map(_.toDouble))val partialResults =
data.mapPartitions(points =
skyline(points.toArray).iterator).collect()val results =
skyline(partialResults)

On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com wrote:

 Hi all,

 As a previous thread, I am asking how to implement a divide-and-conquer
 algorithm (skyline) in Spark.
 Here is my current solution:

 val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble))

 val result = data.mapPartitions(points = 
 *skyline*(points.toArray).iterator).coalesce(1,
 true)
  .mapPartitions(points = *skyline*
 (points.toArray).iterator).collect()

 where skyline is a local algorithm to compute the results:

 def *skyline*(points: Array[Point]) : Array[Point]

 Basically, I find this implement runs slower than the corresponding Hadoop
 version (the identity map phase plus local skyline for both combine and
 reduce phases).

 Below are my questions:

 1. Why this implementation is much slower than the Hadoop one?

 I can find two possible reasons: one is the shuffle overhead in coalesce,
 another is calling the toArray and iterator repeatedly when invoking
 local skyline algorithm. But I am not sure which one is true.

I haven’t seen your Hadoop version. But if this assumption is right, the
above version should help.


 2. One observation is that while Hadoop version almost used up all the CPU
 resources during execution, the CPU seems not that hot on Spark. Is that a
 clue to prove that the shuffling might be the real bottleneck?

How many parallel tasks are there when running your Spark code? I doubt
tasks are queued and run sequentially.


 3. Is there any difference between coalesce(1, true) and reparation? It
 seems that both opeartions need shuffling data. What’s the proper
 situations using the coalesce method?

repartition(n) is just an alias of coalesce(n, true), so yes, they both
involve data shuffling. coalesce can be used to shrink partition number
when dataset size shrinks dramatically after operations like filter. Say
you have an RDD containing 1TB of data with 100 partitions, after a
.filter(...) call, only 20GB data left, then you may want to coalesce to 2
partitions rather than 100.


 4. More generally, I am trying to implementing some core geometry
 computation operators on Spark (like skyline, convex hull etc). In my
 understanding, since Spark is more capable of handling iterative
 computations on dataset, the above solution apparently doesn’t exploit what
 Spark is good at. Any comments on how to do geometry computations on Spark
 (if it is possible) ?

Although Spark is good at iterative algorithms, it also performs better in
batch computing due to much lower scheduling overhead and thread level
parallelism. Theoretically, you can always accelerate your MapReduce job by
rewriting it in Spark.


 Thanks for any insight.

 Yanzhe




Re: How to cogroup/join pair RDDs with different key types?

2014-04-15 Thread Roger Hoover
Andrew,

Thank you very much for your feedback.  Unfortunately, the ranges are not
of predictable size but you gave me an idea of how to handle it.  Here's
what I'm thinking:

1. Choose number of partitions, n, over IP space
2. Preprocess the IPRanges, splitting any of them that cross partition
boundaries
3. Partition ipToUrl and the new ipRangeToZip according to the partitioning
scheme from step 1
4. Join matching partitions of these two RDDs

I still don't know how to do step 4 though.  I see that RDDs have a
mapPartitions() operation to let you do whatever you want with a partition.
 What I need is a way to get my hands on two partitions at once, each from
different RDDs.

Any ideas?

Thanks,

Roger


On Mon, Apr 14, 2014 at 5:45 PM, Andrew Ash and...@andrewash.com wrote:

 Are your IPRanges all on nice, even CIDR-format ranges?  E.g.
 192.168.0.0/16 or 10.0.0.0/8?

 If the range is always an even subnet mask and not split across subnets,
 I'd recommend flatMapping the ipToUrl RDD to (IPRange, String) and then
 joining the two RDDs.  The expansion would be at most 32x if all your
 ranges can be expressed in CIDR notation, and in practice would be much
 smaller than that (typically you don't need things bigger than a /8 and
 often not smaller than a /24)

 Hopefully you can use your knowledge of the ip ranges to make this
 feasible.

 Otherwise, you could additionally flatmap the ipRangeToZip out to a list
 of CIDR notations and do the join then, but you're starting to have the
 cartesian product work against you on scale at that point.

 Andrew


 On Tue, Apr 15, 2014 at 1:07 AM, Roger Hoover roger.hoo...@gmail.comwrote:

 Hi,

 I'm trying to figure out how to join two RDDs with different key types
 and appreciate any suggestions.

 Say I have two RDDS:
 ipToUrl of type (IP, String)
 ipRangeToZip of type (IPRange, String)

 How can I join/cogroup these two RDDs together to produce a new RDD of
 type (IP, (String, String)) where IP is the key and the values are the urls
 and zipcodes?

 Say I have a method on the IPRange class called matches(ip: IP), I want
 the joined records to match when ipRange.matches(ip).

 Thanks,

 Roger





Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-04-15 Thread anant
I've received the same error with Spark built using Maven. It turns out that
mesos-0.13.0 depends on protobuf-2.4.1 which is causing the clash at
runtime. Protobuf included by Akka is shaded and doesn't cause any problems.

The solution is to update the mesos dependency to 0.18.0 in spark's pom.xml.
Rebuilding the JAR with this configuration solves the issue.

-Anant



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p4286.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to cogroup/join pair RDDs with different key types?

2014-04-15 Thread Roger Hoover
I'm thinking of creating a union type for the key so that IPRange and IP
types can be joined.


On Tue, Apr 15, 2014 at 10:44 AM, Roger Hoover roger.hoo...@gmail.comwrote:

 Andrew,

 Thank you very much for your feedback.  Unfortunately, the ranges are not
 of predictable size but you gave me an idea of how to handle it.  Here's
 what I'm thinking:

 1. Choose number of partitions, n, over IP space
 2. Preprocess the IPRanges, splitting any of them that cross partition
 boundaries
 3. Partition ipToUrl and the new ipRangeToZip according to the
 partitioning scheme from step 1
 4. Join matching partitions of these two RDDs

 I still don't know how to do step 4 though.  I see that RDDs have a
 mapPartitions() operation to let you do whatever you want with a partition.
  What I need is a way to get my hands on two partitions at once, each from
 different RDDs.

 Any ideas?

 Thanks,

 Roger


 On Mon, Apr 14, 2014 at 5:45 PM, Andrew Ash and...@andrewash.com wrote:

 Are your IPRanges all on nice, even CIDR-format ranges?  E.g.
 192.168.0.0/16 or 10.0.0.0/8?

 If the range is always an even subnet mask and not split across subnets,
 I'd recommend flatMapping the ipToUrl RDD to (IPRange, String) and then
 joining the two RDDs.  The expansion would be at most 32x if all your
 ranges can be expressed in CIDR notation, and in practice would be much
 smaller than that (typically you don't need things bigger than a /8 and
 often not smaller than a /24)

 Hopefully you can use your knowledge of the ip ranges to make this
 feasible.

 Otherwise, you could additionally flatmap the ipRangeToZip out to a list
 of CIDR notations and do the join then, but you're starting to have the
 cartesian product work against you on scale at that point.

 Andrew


 On Tue, Apr 15, 2014 at 1:07 AM, Roger Hoover roger.hoo...@gmail.comwrote:

 Hi,

 I'm trying to figure out how to join two RDDs with different key types
 and appreciate any suggestions.

 Say I have two RDDS:
 ipToUrl of type (IP, String)
 ipRangeToZip of type (IPRange, String)

 How can I join/cogroup these two RDDs together to produce a new RDD of
 type (IP, (String, String)) where IP is the key and the values are the urls
 and zipcodes?

 Say I have a method on the IPRange class called matches(ip: IP), I want
 the joined records to match when ipRange.matches(ip).

 Thanks,

 Roger






Re: can't sc.paralellize in Spark 0.7.3 spark-shell

2014-04-15 Thread Walrus theCat
Actually altering the classpath in the REPL causes the provided
SparkContext to disappear:

scala sc.parallelize(List(1,2,3))
res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
console:13

scala :cp /root
Added '/root'.  Your new classpath is:
:/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root
14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106
14/04/15 18:19:37 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:48978
Replaying: sc.parallelize(List(1,2,3))
console:8: error: not found: value sc
   sc.parallelize(List(1,2,3))



On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote:

 Nevermind -- I'm like 90% sure the problem is that I'm importing stuff
 that declares a SparkContext as sc.  If it's not, I'll report back.


 On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote:

 Hi,

 Using the spark-shell, I can't sc.parallelize to get an RDD.

 Looks like a bug.

 scala sc.parallelize(Array(a,s,d))
 java.lang.NullPointerException
 at init(console:17)
 at init(console:22)
 at init(console:24)
 at init(console:26)
 at init(console:28)
 at init(console:30)
 at init(console:32)
 at init(console:34)
 at init(console:36)
 at .init(console:40)
 at .clinit(console)
 at .init(console:11)
 at .clinit(console)
 at $export(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629)
 at
 spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890)
 at
 scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43)
 at scala.tools.nsc.io.package$$anon$2.run(package.scala:25)
 at java.lang.Thread.run(Thread.java:744)





Can't run a simple spark application with 0.9.1

2014-04-15 Thread Paul Schooss
Hello,

Currently I deployed 0.9.1 spark using a new way of starting up spark

exec start-stop-daemon --start --pidfile /var/run/spark.pid
--make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME}
--exec /usr/bin/java -- -cp ${CLASSPATH}
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=10111
-Dspark.akka.logLifecycleEvents=true -Djava.library.path=
-XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing
-XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC
-Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS}


where class path points to the spark jar that we compile with sbt. When I
try to run a job I receive the following warning

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
memory


My first question is do I need the entire spark project on disk in order to
run jobs? Or what else am I doing wrong?


Re: Why these operations are slower than the equivalent on Hadoop?

2014-04-15 Thread Eugen Cepoi
It depends on your algorithm but I guess that you probably should use
reduce (the code probably doesn't compile but it shows you the idea).

val result = data.reduce { case (left, right) =
  skyline(left ++ right)
}

Or in the case you want to merge the result of a partition with another one
you could do:

val result = data.mapPartitions { points =

// transforms all the partition into a single element,
but this may incur some other problems, especially if you use Kryo
serialization...
*Seq(skyline*(points.toArray))
 }.reduce { case (left, right) =

skyline(left ++ right)
 }




2014-04-15 19:37 GMT+02:00 Cheng Lian lian.cs@gmail.com:

 Your Spark solution first reduces partial results into a single partition,
 computes the final result, and then collects to the driver side. This
 involves a shuffle and two waves of network traffic. Instead, you can
 directly collect partial results to the driver and then computes the final
 results on driver side:

 val data = sc.textFile(...).map(line = line.split(,).map(_.toDouble))val 
 partialResults = data.mapPartitions(points = 
 skyline(points.toArray).iterator).collect()val results = 
 skyline(partialResults)

 On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com wrote:

  Hi all,

 As a previous thread, I am asking how to implement a divide-and-conquer
 algorithm (skyline) in Spark.
 Here is my current solution:

 val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble))

 val result = data.mapPartitions(points = 
 *skyline*(points.toArray).iterator).coalesce(1,
 true)
  .mapPartitions(points = *skyline*
 (points.toArray).iterator).collect()

 where skyline is a local algorithm to compute the results:

 def *skyline*(points: Array[Point]) : Array[Point]

 Basically, I find this implement runs slower than the corresponding
 Hadoop version (the identity map phase plus local skyline for both combine
 and reduce phases).

 Below are my questions:

 1. Why this implementation is much slower than the Hadoop one?

 I can find two possible reasons: one is the shuffle overhead in coalesce,
 another is calling the toArray and iterator repeatedly when invoking
 local skyline algorithm. But I am not sure which one is true.

 I haven’t seen your Hadoop version. But if this assumption is right, the
 above version should help.


 2. One observation is that while Hadoop version almost used up all the
 CPU resources during execution, the CPU seems not that hot on Spark. Is
 that a clue to prove that the shuffling might be the real bottleneck?

 How many parallel tasks are there when running your Spark code? I doubt
 tasks are queued and run sequentially.


 3. Is there any difference between coalesce(1, true) and reparation? It
 seems that both opeartions need shuffling data. What’s the proper
 situations using the coalesce method?

 repartition(n) is just an alias of coalesce(n, true), so yes, they both
 involve data shuffling. coalesce can be used to shrink partition number
 when dataset size shrinks dramatically after operations like filter. Say
 you have an RDD containing 1TB of data with 100 partitions, after a
 .filter(...) call, only 20GB data left, then you may want to coalesce to
 2 partitions rather than 100.


 4. More generally, I am trying to implementing some core geometry
 computation operators on Spark (like skyline, convex hull etc). In my
 understanding, since Spark is more capable of handling iterative
 computations on dataset, the above solution apparently doesn’t exploit what
 Spark is good at. Any comments on how to do geometry computations on Spark
 (if it is possible) ?

 Although Spark is good at iterative algorithms, it also performs better in
 batch computing due to much lower scheduling overhead and thread level
 parallelism. Theoretically, you can always accelerate your MapReduce job by
 rewriting it in Spark.


 Thanks for any insight.

 Yanzhe




Re: can't sc.paralellize in Spark 0.7.3 spark-shell

2014-04-15 Thread Aaron Davidson
This is probably related to the Scala bug that :cp does not work:
https://issues.scala-lang.org/browse/SI-6502


On Tue, Apr 15, 2014 at 11:21 AM, Walrus theCat walrusthe...@gmail.comwrote:

 Actually altering the classpath in the REPL causes the provided
 SparkContext to disappear:

 scala sc.parallelize(List(1,2,3))
 res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
 console:13

 scala :cp /root
 Added '/root'.  Your new classpath is:

 :/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root
 14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106
 14/04/15 18:19:37 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:48978
 Replaying: sc.parallelize(List(1,2,3))
 console:8: error: not found: value sc
sc.parallelize(List(1,2,3))



 On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote:

 Nevermind -- I'm like 90% sure the problem is that I'm importing stuff
 that declares a SparkContext as sc.  If it's not, I'll report back.


 On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote:

 Hi,

 Using the spark-shell, I can't sc.parallelize to get an RDD.

 Looks like a bug.

 scala sc.parallelize(Array(a,s,d))
 java.lang.NullPointerException
 at init(console:17)
 at init(console:22)
 at init(console:24)
 at init(console:26)
 at init(console:28)
 at init(console:30)
 at init(console:32)
 at init(console:34)
 at init(console:36)
 at .init(console:40)
 at .clinit(console)
 at .init(console:11)
 at .clinit(console)
 at $export(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629)
 at
 spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890)
 at
 scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43)
 at scala.tools.nsc.io.package$$anon$2.run(package.scala:25)
 at java.lang.Thread.run(Thread.java:744)






Re: can't sc.paralellize in Spark 0.7.3 spark-shell

2014-04-15 Thread Walrus theCat
Dankeschön !


On Tue, Apr 15, 2014 at 11:29 AM, Aaron Davidson ilike...@gmail.com wrote:

 This is probably related to the Scala bug that :cp does not work:
 https://issues.scala-lang.org/browse/SI-6502


 On Tue, Apr 15, 2014 at 11:21 AM, Walrus theCat walrusthe...@gmail.comwrote:

 Actually altering the classpath in the REPL causes the provided
 SparkContext to disappear:

 scala sc.parallelize(List(1,2,3))
 res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
 console:13

 scala :cp /root
 Added '/root'.  Your new classpath is:

 :/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root
 14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106
 14/04/15 18:19:37 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:48978
 Replaying: sc.parallelize(List(1,2,3))
 console:8: error: not found: value sc
sc.parallelize(List(1,2,3))



 On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote:

 Nevermind -- I'm like 90% sure the problem is that I'm importing stuff
 that declares a SparkContext as sc.  If it's not, I'll report back.


 On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat 
 walrusthe...@gmail.comwrote:

 Hi,

 Using the spark-shell, I can't sc.parallelize to get an RDD.

 Looks like a bug.

 scala sc.parallelize(Array(a,s,d))
 java.lang.NullPointerException
 at init(console:17)
 at init(console:22)
 at init(console:24)
 at init(console:26)
 at init(console:28)
 at init(console:30)
 at init(console:32)
 at init(console:34)
 at init(console:36)
 at .init(console:40)
 at .clinit(console)
 at .init(console:11)
 at .clinit(console)
 at $export(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629)
 at
 spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890)
 at
 scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43)
 at scala.tools.nsc.io.package$$anon$2.run(package.scala:25)
 at java.lang.Thread.run(Thread.java:744)







Shark: class java.io.IOException: Cannot run program /bin/java

2014-04-15 Thread ge ko
Hi,



after starting the shark-shell
via /opt/shark/shark-0.9.1/bin/shark-withinfo -skipRddReload I receive lots
of output, including the exception that /bin/java cannot be executed. But
it is linked to /usr/bin/java ?!?!



root#ls -al /bin/java

lrwxrwxrwx 1 root root 13 15. Apr 21:45 /bin/java - /usr/bin/java

root#/bin/java -version

java version 1.7.0_51
OpenJDK Runtime Environment (rhel-2.4.4.1.el6_5-x86_64 u51-b02)
OpenJDK 64-Bit Server VM (build 24.45-b08, mixed mode)



Starting the shark shell:



[root@hadoop-pg-5 bin]# /opt/shark/shark-0.9.1/bin/shark-withinfo
-skipRddReload
-hiveconf hive.root.logger=INFO,console -skipRddReload
Starting the Shark Command Line Client
14/04/15 21:45:57 WARN conf.HiveConf: DEPRECATED: Configuration property
hive.metastore.local no longer has any effect. Make sure to provide a valid
value for hive.metastore.uris if you are connecting to a remote metastore.
14/04/15 21:45:58 WARN conf.HiveConf: DEPRECATED: Configuration property
hive.metastore.local no longer has any effect. Make sure to provide a valid
value for hive.metastore.uris if you are connecting to a remote metastore.

Logging initialized using configuration in
jar:file:/opt/shark/shark-0.9.1/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark-0.9.1.jar!/hive-log4j.properties
14/04/15 21:45:58 INFO SessionState:
Logging initialized using configuration in
jar:file:/opt/shark/shark-0.9.1/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark-0.9.1.jar!/hive-log4j.properties
Hive history
file=/tmp/root/hive_job_log_root_22574@hadoop-pg-5.cluster_201404152145_159664609.txt
14/04/15 21:45:58 INFO exec.HiveHistory: Hive history
file=/tmp/root/hive_job_log_root_22574@hadoop-pg-5.cluster_201404152145_159664609.txt
14/04/15 21:45:58 WARN conf.HiveConf: DEPRECATED: Configuration property
hive.metastore.local no longer has any effect. Make sure to provide a valid
value for hive.metastore.uris if you are connecting to a remote metastore.
14/04/15 21:45:59 WARN conf.HiveConf: DEPRECATED: Configuration property
hive.metastore.local no longer has any effect. Make sure to provide a valid
value for hive.metastore.uris if you are connecting to a remote metastore.
14/04/15 21:46:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/04/15 21:46:00 INFO Remoting: Starting remoting
14/04/15 21:46:00 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@hadoop-pg-5.cluster:38835]
14/04/15 21:46:00 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@hadoop-pg-5.cluster:38835]
14/04/15 21:46:00 INFO spark.SparkEnv: Registering BlockManagerMaster
5,108: [GC 262656K-26899K(1005568K), 0,0409080 secs]
14/04/15 21:46:00 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140415214600-9537
14/04/15 21:46:00 INFO storage.MemoryStore: MemoryStore started with
capacity 589.2 MB.
14/04/15 21:46:00 INFO network.ConnectionManager: Bound socket to port
51889 with id = ConnectionManagerId(hadoop-pg-5.cluster,51889)
14/04/15 21:46:00 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/04/15 21:46:00 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Registering block manager hadoop-pg-5.cluster:51889 with 589.2 MB RAM
14/04/15 21:46:00 INFO storage.BlockManagerMaster: Registered BlockManager
14/04/15 21:46:00 INFO spark.HttpServer: Starting HTTP Server
14/04/15 21:46:00 INFO server.Server: jetty-7.6.8.v20121106
14/04/15 21:46:00 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:59414
14/04/15 21:46:00 INFO broadcast.HttpBroadcast: Broadcast server started at
http://10.147.210.5:59414
14/04/15 21:46:01 INFO spark.SparkEnv: Registering MapOutputTracker
14/04/15 21:46:01 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-cf56ada9-d950-4abc-a1c3-76fecdc4faa3
14/04/15 21:46:01 INFO spark.HttpServer: Starting HTTP Server
14/04/15 21:46:01 INFO server.Server: jetty-7.6.8.v20121106
14/04/15 21:46:01 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:45689
14/04/15 21:46:01 INFO server.Server: jetty-7.6.8.v20121106
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/stage,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/pool,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/environment,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/executors,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/metrics/json,null}
14/04/15 21:46:01 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/static,null}
14/04/15 

Problem with KryoSerializer

2014-04-15 Thread yh18190
Hi,

I have a problem when i want to use spark kryoserializer by extending a
class Kryoregistarar to register custom classes inorder to create objects.I
am getting following exception When I run following program..Please let me
know what could be the problem...
] (run-main) org.apache.spark.SparkException: Job failed:
java.io.NotSerializableException: main.scala.Utilities

Registering classes objects:

package main.scala
import com.esotericsoftware.kryo
import org.apache.spark.serializer.KryoRegistrator
import com.esotericsoftware.kryo._

class MykryoRegistrar extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[main.scala.Meter_data])
kryo.register(classOf[main.scala.Utilities])  
  }
}
MeterData_PerDay:Main class

object MeterData_PerDay {
 
   def main(args: Array[String]) {

   System.setProperty(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
System.setProperty(spark.kryo.registrator,
main.scala.MykryoRegistrar)

 var utilclass:Utilities = new Utilities()

val sc = new SparkContext(local, Simple App,
utilclass.spark_home,
 List(target/scala-2.9.3/simple-project_2.9.3-1.0.jar))

val file = sc.textFile(utilclass.data_home)

}}
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-KryoSerializer-tp4295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


StackOverflow Error when run ALS with 100 iterations

2014-04-15 Thread Xiaoli Li
Hi,

I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory. ALS
program cannot run  even with a very small size of training data (about 91
lines) due to StackVverFlow error when I set the number of iterations to
100. I think the problem may be caused by updateFeatures method which
updates products RDD iteratively by join previous products RDD.


I am writing a program which has a similar update process with ALS.  This
problem also appeared when I iterate too many times (more than 80).

The iterative part of my code is as following:

solution = outlinks.join(solution). map {
 ...
 }


Has anyone had similar problem?  Thanks.


Xiaoli


Re: partitioning of small data sets

2014-04-15 Thread Nicholas Chammas
Looking at the Python version of
textFile()http://spark.apache.org/docs/latest/api/pyspark/pyspark.context-pysrc.html#SparkContext.textFile,
shouldn't it be *max*(self.defaultParallelism, 2)?

If the default parallelism is, say 4, wouldn't we want to use that for
minSplits instead of 2?


On Tue, Apr 15, 2014 at 1:04 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Yup, one reason it’s 2 actually is to give people a similar experience to
 working with large files, in case their code doesn’t deal well with the
 file being partitioned.

 Matei

 On Apr 15, 2014, at 9:53 AM, Aaron Davidson ilike...@gmail.com wrote:

 Take a look at the minSplits argument for SparkContext#textFile [1] -- the
 default value is 2. You can simply set this to 1 if you'd prefer not to
 split your data.

 [1]
 http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext


 On Tue, Apr 15, 2014 at 8:44 AM, Diana Carroll dcarr...@cloudera.comwrote:

 I loaded a very tiny file into Spark -- 23 lines of text, 2.6kb

 Given the size, and that it is a single file, I assumed it would only be
 in a single partition.  But when I cache it,  I can see in the Spark App UI
 that it actually splits it into two partitions:

 sparkdev_2014-04-11.png

 Is this correct behavior?  How does Spark decide how big a partition
 should be, or how many partitions to create for an RDD.

 If it matters, I have only a single worker in my cluster, so both
 partitions are stored on the same worker.

 The file was on HDFS and was only a single block.

 Thanks for any insight.

 Diana







Multi-tenant?

2014-04-15 Thread Ian Ferreira
What is the support for multi-tenancy in Spark.

I assume more than one driver can share the same cluster, but can a driver run 
two jobs in parallel?



java.net.SocketException: Network is unreachable while connecting to HBase

2014-04-15 Thread amit karmakar
I am getting a java.net.SocketException: Network is unreachable whenever i
do a count on one of my tables.
If i just do a take(1), i see the task status as killed on the master UI
but i get back the results.
My driver runs on my local system which is accessible over the public
internet and connects to a remote cluster.

This is the code i am trying out.

Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(hbase.zookeeper.quorum,
xx.xx.xx.xx,xx.xx.xx.xx,xx.xx.xx.xx);
hbaseConf.set(TableInputFormat.INPUT_TABLE, table);
JavaPairRDDImmutableBytesWritable, Result rdd =
sparkContext.newAPIHadoopRDD(hbaseConf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
System.out.println(Count=+rdd.count());

Please suggest what i am missing and how to fix this issue.

Thanks a lot.

14/04/15 22:39:22 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
0 on executor 2: x (PROCESS_LOCAL)
14/04/15 22:39:22 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1731 bytes in 22 ms
14/04/15 22:39:24 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/04/15 22:39:24 WARN scheduler.TaskSetManager: Loss was due to
java.net.SocketException
java.net.SocketException: Network is unreachable
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:378)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:473)
at sun.net.www.http.HttpClient.init(HttpClient.java:203)
at sun.net.www.http.HttpClient.New(HttpClient.java:290)
at sun.net.www.http.HttpClient.New(HttpClient.java:306)
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:995)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:931)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:849)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1299)
at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:156)
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1872)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1872)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970)
at

Re: Can't run a simple spark application with 0.9.1

2014-04-15 Thread Paul Schooss
I am a dork please disregard this issue. I did not have the slaves
correctly configured. This error is very misleading


On Tue, Apr 15, 2014 at 11:21 AM, Paul Schooss paulmscho...@gmail.comwrote:

 Hello,

 Currently I deployed 0.9.1 spark using a new way of starting up spark

 exec start-stop-daemon --start --pidfile /var/run/spark.pid
 --make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME}
 --exec /usr/bin/java -- -cp ${CLASSPATH}
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.port=10111
 -Dspark.akka.logLifecycleEvents=true -Djava.library.path=
 -XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing
 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC
 -Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS}


 where class path points to the spark jar that we compile with sbt. When I
 try to run a job I receive the following warning

 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
 your cluster UI to ensure that workers are registered and have sufficient
 memory


 My first question is do I need the entire spark project on disk in order
 to run jobs? Or what else am I doing wrong?



Re: Multi-tenant?

2014-04-15 Thread Matei Zaharia
Yes, both things can happen. Take a look at 
http://spark.apache.org/docs/latest/job-scheduling.html, which includes 
scheduling concurrent jobs within the same driver.

Matei

On Apr 15, 2014, at 4:08 PM, Ian Ferreira ianferre...@hotmail.com wrote:

 What is the support for multi-tenancy in Spark.
 
 I assume more than one driver can share the same cluster, but can a driver 
 run two jobs in parallel?
 



RE: Multi-tenant?

2014-04-15 Thread Ian Ferreira
Thanks Matei!

Sent from my Windows Phone

From: Matei Zahariamailto:matei.zaha...@gmail.com
Sent: ‎4/‎15/‎2014 7:14 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Multi-tenant?

Yes, both things can happen. Take a look at 
http://spark.apache.org/docs/latest/job-scheduling.html, which includes 
scheduling concurrent jobs within the same driver.

Matei

On Apr 15, 2014, at 4:08 PM, Ian Ferreira ianferre...@hotmail.com wrote:

 What is the support for multi-tenancy in Spark.

 I assume more than one driver can share the same cluster, but can a driver 
 run two jobs in parallel?




Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-04-15 Thread giive chen
Hi Prasad

Sorry for missing your reply.
https://gist.github.com/thegiive/10791823
Here it is.

Wisely Chen


On Fri, Apr 4, 2014 at 11:57 PM, Prasad ramachandran.pra...@gmail.comwrote:

 Hi Wisely,
 Could you please post your pom.xml here.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p3770.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: storage.MemoryStore estimated size 7 times larger than real

2014-04-15 Thread wxhsdp
thank you so much, davidson
ye, you are right, in both sbt and spark shell, the result of my code is
28MB, it's irrelevant to numSlices.
yesterday i had the result of 4.2MB in spark shell, because i remove array
initialization for laziness:)

for(i - 0 until size) {
  array(i) = i
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4306.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: StackOverflow Error when run ALS with 100 iterations

2014-04-15 Thread Cheng Lian
Probably this JIRA
issuehttps://spark-project.atlassian.net/browse/SPARK-1006solves
your problem. When running with large iteration number, the lineage
DAG of ALS becomes very deep, both DAGScheduler and Java serializer may
overflow because they are implemented in a recursive way. You may resort to
checkpointing as a workaround.


On Wed, Apr 16, 2014 at 5:29 AM, Xiaoli Li lixiaolima...@gmail.com wrote:

 Hi,

 I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory. ALS
 program cannot run  even with a very small size of training data (about 91
 lines) due to StackVverFlow error when I set the number of iterations to
 100. I think the problem may be caused by updateFeatures method which
 updates products RDD iteratively by join previous products RDD.


 I am writing a program which has a similar update process with ALS.  This
 problem also appeared when I iterate too many times (more than 80).

 The iterative part of my code is as following:

 solution = outlinks.join(solution). map {
  ...
  }


 Has anyone had similar problem?  Thanks.


 Xiaoli



JMX with Spark

2014-04-15 Thread Paul Schooss
Has anyone got this working? I have enabled the properties for it in the
metrics.conf file and ensure that it is placed under spark's home
directory. Any ideas why I don't see spark beans ?


Re: JMX with Spark

2014-04-15 Thread Parviz Deyhim
home directory or $home/conf directory? works for me with
metrics.properties hosted under conf dir.


On Tue, Apr 15, 2014 at 6:08 PM, Paul Schooss paulmscho...@gmail.comwrote:

 Has anyone got this working? I have enabled the properties for it in the
 metrics.conf file and ensure that it is placed under spark's home
 directory. Any ideas why I don't see spark beans ?



Re: java.net.SocketException: Network is unreachable while connecting to HBase

2014-04-15 Thread amit
In the worker logs i can see,

14/04/16 01:02:47 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@xx:10548] -
[akka.tcp://sparkExecutor@xx:16041]: Error [Association failed with
[akka.tcp://sparkExecutor@xx:16041]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@xx:16041]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: xx/xx.xx.xx.xx:16041
]




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-net-SocketException-Network-is-unreachable-while-connecting-to-HBase-tp4301p4310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: StackOverflow Error when run ALS with 100 iterations

2014-04-15 Thread Xiaoli Li
Thanks a lot for your information. It really helps me.


On Tue, Apr 15, 2014 at 7:57 PM, Cheng Lian lian.cs@gmail.com wrote:

 Probably this JIRA 
 issuehttps://spark-project.atlassian.net/browse/SPARK-1006solves your 
 problem. When running with large iteration number, the lineage
 DAG of ALS becomes very deep, both DAGScheduler and Java serializer may
 overflow because they are implemented in a recursive way. You may resort to
 checkpointing as a workaround.


 On Wed, Apr 16, 2014 at 5:29 AM, Xiaoli Li lixiaolima...@gmail.comwrote:

 Hi,

 I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory. ALS
 program cannot run  even with a very small size of training data (about 91
 lines) due to StackVverFlow error when I set the number of iterations to
 100. I think the problem may be caused by updateFeatures method which
 updates products RDD iteratively by join previous products RDD.


 I am writing a program which has a similar update process with ALS.  This
 problem also appeared when I iterate too many times (more than 80).

 The iterative part of my code is as following:

 solution = outlinks.join(solution). map {
  ...
  }


 Has anyone had similar problem?  Thanks.


 Xiaoli





Re: Why these operations are slower than the equivalent on Hadoop?

2014-04-15 Thread Yanzhe Chen
Eugen,

Thanks for your tip and I do want to merge the result of a partition with 
another one but I am still not quite clear how to do it.

Say the original data rdd has 32 partitions and since mapPartitions won’t 
change the number of partitions, it will remain 32 partitions which each 
contains the partial skyline of points in its partition. Now I want to merge 
those 32 partitions to generate a new skyline. It will be better if I can use 
reduce to merge each two of them (than just collect them in to one), but I 
think simply calling reduce method on the rdd won’t work because it reduce the 
data at the granularity of point rather than the partition results (which is 
the collection of points). So is there a way to reduce the data at the 
granularity of partitions?

Thanks,

Yanzhe  

On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote:

 It depends on your algorithm but I guess that you probably should use reduce 
 (the code probably doesn't compile but it shows you the idea).
  
 val result = data.reduce { case (left, right) =
   skyline(left ++ right)
 }
  
 Or in the case you want to merge the result of a partition with another one 
 you could do:
  
 val result = data.mapPartitions { points =  
  
 // transforms all the partition into a single element, 
 but this may incur some other problems, especially if you use Kryo 
 serialization...
 Seq(skyline(points.toArray))  
  }.reduce { case (left, right) =
  
 skyline(left ++ right)
  }
  
  
  
  
 2014-04-15 19:37 GMT+02:00 Cheng Lian lian.cs@gmail.com 
 (mailto:lian.cs@gmail.com):
   
  Your Spark solution first reduces partial results into a single partition, 
  computes the final result, and then collects to the driver side. This 
  involves a shuffle and two waves of network traffic. Instead, you can 
  directly collect partial results to the driver and then computes the final 
  results on driver side:
   
  val data = sc.textFile(...).map(line = line.split(,).map(_.toDouble)) 
  val partialResults = data.mapPartitions(points = 
  skyline(points.toArray).iterator).collect() val results = 
  skyline(partialResults)  
   
  On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com 
  (mailto:yanzhe...@gmail.com) wrote:
   
   
   
   
   
   Hi all,  

   As a previous thread, I am asking how to implement a divide-and-conquer 
   algorithm (skyline) in Spark.
   Here is my current solution:

   val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble))  

   val result = data.mapPartitions(points = 
   skyline(points.toArray).iterator).coalesce(1, true)  
.mapPartitions(points = 
   skyline(points.toArray).iterator).collect()

   where skyline is a local algorithm to compute the results:  

   def skyline(points: Array[Point]) : Array[Point]

   Basically, I find this implement runs slower than the corresponding 
   Hadoop version (the identity map phase plus local skyline for both 
   combine and reduce phases).  

   Below are my questions:

   1. Why this implementation is much slower than the Hadoop one?  

   I can find two possible reasons: one is the shuffle overhead in coalesce, 
   another is calling the toArray and iterator repeatedly when invoking 
   local skyline algorithm. But I am not sure which one is true.  
   
   
   
   
   
   
   
  I haven’t seen your Hadoop version. But if this assumption is right, the 
  above version should help.
   
   
   
   
   

   2. One observation is that while Hadoop version almost used up all the 
   CPU resources during execution, the CPU seems not that hot on Spark. Is 
   that a clue to prove that the shuffling might be the real bottleneck?  
   
   
   
   
   
   
   
  How many parallel tasks are there when running your Spark code? I doubt 
  tasks are queued and run sequentially.
   
   
   
   
   

   3. Is there any difference between coalesce(1, true) and reparation? It 
   seems that both opeartions need shuffling data. What’s the proper 
   situations using the coalesce method?  
   
   
   
   
   
   
   
  repartition(n) is just an alias of coalesce(n, true), so yes, they both 
  involve data shuffling. coalesce can be used to shrink partition number 
  when dataset size shrinks dramatically after operations like filter. Say 
  you have an RDD containing 1TB of data with 100 partitions, after a 
  .filter(...) call, only 20GB data left, then you may want to coalesce to 2 
  partitions rather than 100.
   
   
   
   
   

   4. More generally, I am trying to implementing some core geometry 
   computation operators on Spark (like skyline, convex hull etc). In my 
   understanding, since Spark is more capable of handling iterative 
   computations on dataset, the above solution apparently doesn’t exploit 
   what Spark is good at. Any comments on 

what is the difference between element and partition?

2014-04-15 Thread Joe L




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-difference-between-element-and-partition-tp4317.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


groupByKey(None) returns partitions according to the keys?

2014-04-15 Thread Joe L
I was wonder if groupByKey returns 2 partitions in the below example?

 x = sc.parallelize([(a, 1), (b, 1), (a, 1)])
 sorted(x.groupByKey().collect())
[('a', [1, 1]), ('b', [1])]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupByKey-None-returns-partitions-according-to-the-keys-tp4318.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.