SparkR Installation

2014-06-18 Thread Stuti Awasthi
Hi All,

I wanted to try SparkR. Do we need preinstalled R on all the nodes of the 
cluster before installing SparkR package ? Please guide me how to proceed with 
this. As of now, I work with R only on single node.
Please suggest

Thanks
Stuti Awasthi


::DISCLAIMER::


The contents of this e-mail and any attachment(s) are confidential and intended 
for the named recipient(s) only.
E-mail transmission is not guaranteed to be secure or error-free as information 
could be intercepted, corrupted,
lost, destroyed, arrive late or incomplete, or may contain viruses in 
transmission. The e mail and its contents
(with or without referred errors) shall therefore not attach any liability on 
the originator or HCL or its affiliates.
Views or opinions, if any, presented in this email are solely those of the 
author and may not necessarily reflect the
views or opinions of HCL or its affiliates. Any form of reproduction, 
dissemination, copying, disclosure, modification,
distribution and / or publication of this message without the prior written 
consent of authorized representative of
HCL is strictly prohibited. If you have received this email in error please 
delete it and notify the sender immediately.
Before opening any email and/or attachments, please check them for viruses and 
other defects.




Re: Enormous EC2 price jump makes r3.large patch more important

2014-06-18 Thread Jeremy Lee
Ah, right. So only the launch script has changed. Everything else is still
essentially binary compatible?

Well, that makes it too easy! Thanks!


On Wed, Jun 18, 2014 at 2:35 PM, Patrick Wendell pwend...@gmail.com wrote:

 Actually you'll just want to clone the 1.0 branch then use the
 spark-ec2 script in there to launch your cluster. The --spark-git-repo
 flag is if you want to launch with a different version of Spark on the
 cluster. In your case you just need a different version of the launch
 script itself, which will be present in the 1.0 branch of Spark.

 - Patrick

 On Tue, Jun 17, 2014 at 9:29 PM, Jeremy Lee
 unorthodox.engine...@gmail.com wrote:
  I am about to spin up some new clusters, so I may give that a go... any
  special instructions for making them work? I assume I use the 
  --spark-git-repo= option on the spark-ec2 command. Is it as easy as
  concatenating your string as the value?
 
  On cluster management GUIs... I've been looking around at Amabari,
 Datastax,
  Cloudera, OpsCenter etc. Not totally convinced by any of them yet. Anyone
  using a good one I should know about? I'm really beginning to lean in the
  direction of Cassandra as the distributed data store...
 
 
  On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell pwend...@gmail.com
 wrote:
 
  By the way, in case it's not clear, I mean our maintenance branches:
 
  https://github.com/apache/spark/tree/branch-1.0
 
  On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell pwend...@gmail.com
  wrote:
   Hey Jeremy,
  
   This is patched in the 1.0 and 0.9 branches of Spark. We're likely to
   make a 1.0.1 release soon (this patch being one of the main reasons),
   but if you are itching for this sooner, you can just checkout the head
   of branch-1.0 and you will be able to use r3.XXX instances.
  
   - Patrick
  
   On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee
   unorthodox.engine...@gmail.com wrote:
   Some people (me included) might have wondered why all our m1.large
 spot
   instances (in us-west-1) shut down a few hours ago...
  
   Simple reason: The EC2 spot price for Spark's default m1.large
   instances
   just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times.
   Probably
   something to do with world cup.
  
   So far this is just us-west-1, but prices have a tendency to equalize
   across
   centers as the days pass. Time to make backups and plans.
  
   m3 spot prices are still down at $0.02 (and being new, will be
   bypassed by
   older systems), so it would be REAAALLYY nice if there had been some
   progress on that issue. Let me know if I can help with testing and
   whatnot.
  
  
   --
   Jeremy Lee  BCompSci(Hons)
 The Unorthodox Engineers
 
 
 
 
  --
  Jeremy Lee  BCompSci(Hons)
The Unorthodox Engineers




-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


Re: question about setting SPARK_CLASSPATH IN spark_env.sh

2014-06-18 Thread santhoma
Thanks, I hope this problem will go away once I upgrade to spark 1.0 where we
can send the clusterwide classpaths using spark-submit command



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809p7822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Memory footprint of Calliope: Spark - Cassandra writes

2014-06-18 Thread tj opensource
Gerard,

We haven't done a test on Calliope vs a driver.

The thing is Calliope builds on C* thrift (and latest build on DS driver)
and the performance in terms of simple write will be similar to any
existing driver. But then that is not the use case for Calliope.

It is built to be used from Spark and to harness the distributed nature of
Spark. With a regular driver you would have to take care of multithreading,
splitting the data, etc. While with spark and Calliope this comes free.

Regards,
Rohit



On Tue, Jun 17, 2014 at 9:24 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Rohit,

 Thanks a lot for looking at this.  The intention of calculating the data
 upfront it to only benchmark the time it takes store in records/sec
 eliminating the generation factor from it (which will be different on the
 real scenario, reading from HDFS)
 I used a profiler today and indeed it's not the storage part, but the
 generation that's bloating the memory.  Objects in memory take surprisingly
 more space that one would expect based on the data they hold. In my case it
 was 2.1x the size of the original data.

 Now that  we are talking about this, do you have some figures of how
 Calliope compares -performance wise- to a classic Cassandra driver
 (DataStax / Astyanax) ?  that would be awesome.

 Thanks again!

 -kr, Gerard.





 On Tue, Jun 17, 2014 at 4:27 PM, tj opensource opensou...@tuplejump.com
 wrote:

 Dear Gerard,

 I just tried the code you posted in the gist (
 https://gist.github.com/maasg/68de6016bffe5e71b78c) and it does give a
 OOM. It is cause of the data being generated locally and then paralellized
 -


 --



 val entries = for (i - 1 to total) yield {




   Array(sdevy$i, aggr, 1000, sum, (i to i+10).mkString(,))




 }



 val rdd = sc.parallelize(entries,8)





 --





 This will generate all the data on the local system and then try to
 partition it.

 Instead, we should paralellize the keys (i - 1 to total) and generate
 data in the map tasks. This is *closer* to what you will get if you
 distribute out a file on a DFS like HDFS/SnackFS.

 I have made the change in the script here (
 https://gist.github.com/milliondreams/aac52e08953949057e7d)


 --



 val rdd = sc.parallelize(1 to total, 8).map(i = Array(sdevy$i, 
 aggr, 1000, sum, (i to i+10).mkString(,)))




 --





 I was able to insert 50M records using just over 350M RAM. Attaching the
 log and screenshot.

 Let me know if you still face this issue... we can do a screen share and
 resolve thee issue there.

 And thanks for using Calliope. I hope it serves your needs.

 Cheers,
 Rohit


 On Mon, Jun 16, 2014 at 9:57 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 I've been doing some testing with Calliope as a way to do batch load
 from Spark into Cassandra.
 My initial results are promising on the performance area, but worrisome
 on the memory footprint side.

 I'm generating N records of about 50 bytes each and using the UPDATE
 mutator to insert them into C*.   I get OOM if my memory is below 1GB per
 million of records, or about 50Mb of raw data (without counting any
 RDD/structural overhead).  (See code [1])

 (so, to avoid confusions: e.g.: I need 4GB RAM to save  4M of 50Byte
 records to Cassandra)  That's an order of magnitude more than the RAW data.

 I understood that Calliope builds on top of the Hadoop support of
 Cassandra, which builds on top of SSTables and sstableloader.

 I would like to know what's the memory usage factor of Calliope and what
 parameters could I use to control/tune that.

 Any experience/advice on that?

  -kr, Gerard.

 [1] https://gist.github.com/maasg/68de6016bffe5e71b78c






Re: Wildcard support in input path

2014-06-18 Thread Jianshi Huang
Hi Andrew,

Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
file not found. I'll try again.

Jianshi


On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com wrote:

 In Spark you can use the normal globs supported by Hadoop's FileSystem,
 which are documented here:
 http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)


 On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi Jianshi,

 I have used wild card characters (*) in my program and it worked..
 My code was like this
 b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*)

 Thanks  Regards,
 Meethu M


   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:


  It would be convenient if Spark's textFile, parquetFile, etc. can
 support path with wildcard, such as:

   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

  Or is there already a way to do it now?

 Jianshi

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Wildcard support in input path

2014-06-18 Thread Jianshi Huang
Hi all,

Thanks for the reply. I'm using parquetFile as input, is that a problem? In
hadoop fs -ls, the path
(hdfs://domain/user/jianshuang/data/parquet/table/month=2014*)
will get list all the files.

I'll test it again.

Jianshi


On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi Andrew,

 Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
 file not found. I'll try again.

 Jianshi


 On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com wrote:

 In Spark you can use the normal globs supported by Hadoop's FileSystem,
 which are documented here:
 http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)


 On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi Jianshi,

 I have used wild card characters (*) in my program and it worked..
 My code was like this
 b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*)

 Thanks  Regards,
 Meethu M


   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:


  It would be convenient if Spark's textFile, parquetFile, etc. can
 support path with wildcard, such as:

   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

  Or is there already a way to do it now?

 Jianshi

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Contribution to Spark MLLib

2014-06-18 Thread Jayati
Hello Xiangrui,

Thanks for sharing the roadmap. I really helped.

Regards,
Jayati





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716p7826.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Example with CDH5

2014-06-18 Thread Sean Owen
There is nothing special about CDH5 Spark in this regard. CDH 5.0.x has
Spark 0.9.0, and the imminent next release will have 1.0.0 + upstream
patches.

You're simply accessing a class that was not present in 0.9.0, but is
present after that:

https://github.com/apache/spark/commits/master/core/src/main/scala/org/apache/spark/SecurityManager.scala




On Wed, Jun 18, 2014 at 3:14 AM, manas Kar manas@exactearth.com wrote:

 Hi Spark Gurus,
  I am trying to compile a spark streaming example with CDH5 and having
 problem compiling it.
 Has anyone created an example spark streaming using CDH5(preferably Spark
 0.9.1) and would be kind enough to share the build.sbt(.scala) file?(or
 point to their example on github). I know there is a streaming example
  here
 https://github.com/apache/spark/tree/master/examples   but I am looking
 for something that runs with CDH5.


 My build.scala files looks like given below.

  object Dependency {
 // Versions
 object V {
 val Akka = 2.3.0
 val scala = 2.10.4
 val cloudera = 0.9.0-cdh5.0.0
 }

 val sparkCore  = org.apache.spark %% spark-core% V.cloudera
 val sparkStreaming = org.apache.spark %% spark-streaming % V.cloudera

 resolvers ++= Seq( cloudera repo at
 https://repository.cloudera.com/artifactory/cloudera-repos/;,
   haddop repo at
 https://repository.cloudera.com/content/repositories/releases/;)

 I have also attached the complete build.scala file for sake of
 completeness.
 sbt dist gives the following error:
  object SecurityManager is not a member of package org.apache.spark
 [error] import org.apache.spark.{SparkConf, SecurityManager}


 build.scala
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n7796/build.scala
 


 Appreciate the great work the spark community is doing. It is by far the
 best thing I have worked on.

 ..Manas



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Example-with-CDH5-tp7796.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: join operation is taking too much time

2014-06-18 Thread MEETHU MATHEW
Hi,
Thanks Andrew and Daniel for the response.

Setting spark.shuffle.spill to false didnt make any difference. 5 days  
completed in 6 min and 10 days was stuck after around 1hr.


Daniel,in my current use case I cant read all the files to a single RDD.But I 
have another use case where I did it in that way,ie  I read all the files to a 
single RDD and joined with with the RDD of 9 million rows and it worked fine  
and took only 3 minutes.
 
Thanks  Regards, 
Meethu M


On Wednesday, 18 June 2014 12:11 AM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:
 


I've been wondering about this. Is there a difference in performance between 
these two?

valrdd1 =sc.textFile(files.mkString(,))valrdd2 
=sc.union(files.map(sc.textFile(_)))

I don't know about your use-case, Meethu, but it may be worth trying to see if 
reading all the files into one RDD (like rdd1) would perform better in the 
join. (If this is possible in your situation.)




On Tue, Jun 17, 2014 at 6:45 PM, Andrew Or and...@databricks.com wrote:

How long does it get stuck for? This is a common sign for the OS thrashing due 
to out of memory exceptions. If you keep it running longer, does it throw an 
error?


Depending on how large your other RDD is (and your join operation), memory 
pressure may or may not be the problem at all. It could be that spilling your 
shuffles
to disk is slowing you down (but probably shouldn't hang your application). 
For the 5 RDDs case, what happens if you set spark.shuffle.spill to false?



2014-06-17 5:59 GMT-07:00 MEETHU MATHEW meethu2...@yahoo.co.in:




 Hi all,


I want  to do a recursive leftOuterJoin between an RDD (created from  file) 
with 9 million rows(size of the file is 100MB) and 30 other RDDs(created from 
30 diff files in each iteration of a loop) varying from 1 to 6 million rows.
When I run it for 5 RDDs,its running successfully  in 5 minutes.But when I 
increase it to 10 or 30 RDDs its gradually slowing down and finally getting 
stuck without showing any warning or error.


I am running in standalone mode with 2 workers of 4GB each and a total of 16 
cores .


Any of you facing similar problems with JOIN  or is it a problem with my 
configuration.


Thanks  Regards, 
Meethu M


Re: Unit test failure: Address already in use

2014-06-18 Thread Anselme Vignon
Hi,

Could your problem come from the fact that you run your tests in parallel ?

If you are spark in local mode, you cannot have concurrent spark instances
running. this means that your tests instantiating sparkContext cannot be
run in parallel. The easiest fix is to tell sbt to not run parallel tests.
This can be done by adding the following line in your build.sbt:

parallelExecution in Test := false

Cheers,

Anselme




2014-06-17 23:01 GMT+02:00 SK skrishna...@gmail.com:

 Hi,

 I have 3 unit tests (independent of each other) in the /src/test/scala
 folder. When I run each of them individually using: sbt test-only test,
 all the 3 pass the test. But when I run them all using sbt test, then
 they
 fail with the warning below. I am wondering if the binding exception
 results
 in failure to run the job, thereby causing the failure. If so, what can I
 do
 to address this binding exception? I am running these tests locally on a
 standalone machine (i.e. SparkContext(local, test)).


 14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
 org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address
 already in use
 java.net.BindException: Address already in use
 at sun.nio.ch.Net.bind0(Native Method)
 at sun.nio.ch.Net.bind(Net.java:174)
 at
 sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
 at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)


 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: get schema from SchemaRDD

2014-06-18 Thread Michael Armbrust
We just merged a feature into master that lets you print the schema or view
it as a string (printSchema() and schemaTreeString on SchemaRDD).

There is also this JIRA targeting 1.1 for presenting a nice programatic API
for this information: https://issues.apache.org/jira/browse/SPARK-2179


On Wed, Jun 18, 2014 at 10:36 AM, Kevin Jung itsjb.j...@samsung.com wrote:

 Can I get schema information from SchemaRDD?
 For example,

 *case class Person(name:String, Age:Int, Gender:String, Birth:String)
 val peopleRDD = sc.textFile(/sample/sample.csv).map(_.split(,)).map(p
 =
 Person(p(0).toString, p(1).toInt, p(2).toString, p(3).toString))
 peopleRDD.saveAsParquetFile(people.parquet)*

 (few days later...)

 *val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 val loadedPeopleRDD = sqlContext.parquetFile(people.parquet)
 loadedPeopleRDD.registerAsTable(peopleTable)*

 Someone who doesn't know Person class can't know what columns and types
 this
 table have.
 Maybe they want to get schema information from loadedPeopleRDD.
 How can I do this?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/get-schema-from-SchemaRDD-tp7830.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: rdd.cache() is not faster?

2014-06-18 Thread Gaurav Jain
You cannot assume that caching would always reduce the execution time,
especially if the data-set is large. It appears that if too much memory is
used for caching, then less memory is left for the actual computation
itself. There has to be a balance between the two. 

Page 33 of this thesis from KTH talks about this:
http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf

Best



-
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cannot print a derived DStream after reduceByKey

2014-06-18 Thread haopu
I guess this is a basic question about the usage of reduce. Please shed some
lights, thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-print-a-derived-DStream-after-reduceByKey-tp7834p7836.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Cannot print a derived DStream after reduceByKey

2014-06-18 Thread haopu
In the test application, I create a DStream by connect with a socket. 
Then I want to count the RDDs in the DStream which matches with another
reference RDD. 
Below is the Java code for my application. 

== 
public class TestSparkStreaming { 

public static void main(String[] args) { 
// Function to make a pair of String 
class StringToPair implements PairFunctionString, String,
String { 
String value_; 
StringToPair(String value) { 
value_ = value; 
} 
@Override 
public Tuple2String, String call(String arg0)
throws Exception { 
return new Tuple2String, String(arg0,
value_); 
} 
} 

JavaStreamingContext jssc = new
JavaStreamingContext(local, TestSparkStreaming, new Duration(1000)); 
JavaReceiverInputDStreamString networkevents =
jssc.socketTextStream(localhost, ); 

// Pair input line with world 
JavaPairDStreamString, String streamEvents =
networkevents.mapToPair(new StringToPair(world)); 

// Construct hello - spark pair for input line to join with 
JavaSparkContext sc = new JavaSparkContext(new SparkConf()); 
ListString list = Arrays.asList(hello); 
JavaRDDString reference = sc.parallelize(list); 
final JavaPairRDDString, String referenceData =
reference.mapToPair(new StringToPair(spark)); 

class MatchInputLine implements
PairFunctionTuple2lt;String, String, String, Long { 
@Override 
public Tuple2String, Long call( 
Tuple2String, String t) throws
Exception { 
final String inputKey = t._1; 
final String inputValue = t._2; 
final ListString ret =
referenceData.lookup(inputKey); 
return new Tuple2String, Long(inputKey,
new Long((ret != null) ? ret.size() : 0)); 
} 
} 

// Construct an output DStream if matched 
JavaPairDStreamString, Long joinedStream =
streamEvents.mapToPair(new MatchInputLine()); 

// Count the output 
class Count implements Function2Long, Long, Long { 
@Override 
public Long call(Long v1, Long v2) throws Exception
{ 
return v1 + v2; 
} 
} 
JavaPairDStreamString, Long aggregatedJoinedStream =
joinedStream.reduceByKey(new Count()); 

// Print the output 
aggregatedJoinedStream.count().print(); 

jssc.start();   
jssc.awaitTermination(); 
} 
} 
== 

I'm testing on Windows in local mode (1.0.0). After I start the socket
server (the nc program mentioned in Spark's document) and submit the
packaged jar into Spark, I expect to see the output when I type hello in. 
However, I didn't see any output. I saw below message in the console where I
submit the jar. 

== 
14/06/18 18:17:48 INFO JobScheduler: Added jobs for time 1403086668000 ms 
14/06/18 18:17:48 INFO MemoryStore: ensureFreeSpace(12) called with
curMem=0, maxMem=1235327385 
14/06/18 18:17:48 INFO MemoryStore: Block input-0-1403086668400 stored as
bytesto memory (size 12.0 B, free 1178.1 MB) 
14/06/18 18:17:48 INFO BlockManagerInfo: Added input-0-1403086668400 in
memory on PEK-WKST68449:60769 (size: 12.0 B, free: 1178.1 MB) 
14/06/18 18:17:48 INFO BlockManagerMaster: Updated info of block
input-0-1403086668400 
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[PEK-WKST68449/10.101.3.75:60769] 
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[PEK-WKST68449/10.101.3.75] 
14/06/18 18:17:48 INFO SendingConnection: Connected to
[PEK-WKST68449/10.101.3.75:60769], 1 messages pending 
14/06/18 18:17:48 WARN BlockManager: Block input-0-1403086668400 already
existson this machine; not re-adding it 
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[/127.0.0.1:60789] 
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[127.0.0.1/127.0.0.1] 
14/06/18 18:17:48 INFO SendingConnection: Connected to [/127.0.0.1:60789], 1
messages pending
14/06/18 18:17:48 INFO BlockGenerator: Pushed block input-0-1403086668400 
14/06/18 18:17:49 INFO ReceiverTracker: Stream 0 received 1 blocks 
14/06/18 18:17:49 INFO JobScheduler: Added jobs for time 1403086669000 ms 
== 

I see one Waiting Batches in Spark's monitoring UI. I'm 

BSP realization on Spark

2014-06-18 Thread Ghousia
Hi,

We are trying to implement a BSP model in Spark with the help of GraphX.
One thing I encountered is a Pregel operator in Graph class. But what I
fail to understand is how the Master and Worker needs to be assigned (BSP),
and how barrier synchronization would happen. The pregel operator provides
a way to define a vertex program, but nothing is mentioned about the
barrier synchronization.

Any help in this regard is truly appreciated.

Many Thanks,
Ghousia.


Re: Java IO Stream Corrupted - Invalid Type AC?

2014-06-18 Thread Surendranauth Hiraman
Patrick,

My team is using shuffle consolidation but not speculation. We are also
using persist(DISK_ONLY) for caching.

Here are some config changes that are in our work-in-progress.

We've been trying for 2 weeks to get our production flow (maybe around
50-70 stages, a few forks and joins with up to 20 branches in the forks) to
run end to end without any success, running into other problems besides
this one as well. For example, we have run into situations where saving to
HDFS just hangs on a couple of tasks, which are printing out nothing in
their logs and not taking any CPU. For testing, our input data is 10 GB
across 320 input splits and generates maybe around 200-300 GB of
intermediate and final data.


conf.set(spark.executor.memory, 14g) // TODO make this
configurable

// shuffle configs
conf.set(spark.default.parallelism, 320) // TODO make this
configurable
conf.set(spark.shuffle.consolidateFiles,true)

conf.set(spark.shuffle.file.buffer.kb, 200)
conf.set(spark.reducer.maxMbInFlight, 96)

conf.set(spark.rdd.compress,true

// we ran into a problem with the default timeout of 60 seconds
// this is also being set in the master's spark-env.sh. Not sure if
it needs to be in both places
conf.set(spark.worker.timeout,180)

// akka settings
conf.set(spark.akka.threads, 300)
conf.set(spark.akka.timeout, 180)
conf.set(spark.akka.frameSize, 100)
conf.set(spark.akka.batchSize, 30)
conf.set(spark.akka.askTimeout, 30)

// block manager
conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18)
conf.set(spark.blockManagerHeartBeatMs, 8)

-Suren



On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell pwend...@gmail.com wrote:

 Out of curiosity - are you guys using speculation, shuffle
 consolidation, or any other non-default option? If so that would help
 narrow down what's causing this corruption.

 On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
 suren.hira...@velos.io wrote:
  Matt/Ryan,
 
  Did you make any headway on this? My team is running into this also.
  Doesn't happen on smaller datasets. Our input set is about 10 GB but we
  generate 100s of GBs in the flow itself.
 
  -Suren
 
 
 
 
  On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton compton.r...@gmail.com
 wrote:
 
  Just ran into this today myself. I'm on branch-1.0 using a CDH3
  cluster (no modifications to Spark or its dependencies). The error
  appeared trying to run GraphX's .connectedComponents() on a ~200GB
  edge list (GraphX worked beautifully on smaller data).
 
  Here's the stacktrace (it's quite similar to yours
  https://imgur.com/7iBA4nJ ).
 
  14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
  4 times; aborting job
  14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
  VertexRDD.scala:100
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 5.599:39 failed 4 times, most
  recent failure: Exception failure in TID 29735 on host node18:
  java.io.StreamCorruptedException: invalid type code: AC
 
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 
 
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 
 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 
 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
 
 org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
 
 
 org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
 
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
 
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  org.apache.spark.scheduler.Task.run(Task.scala:51)
 
  

Re: Contribution to Spark MLLib

2014-06-18 Thread Denis Turdakov
Hello everybody,

Xiangrui, thanks for the link to roadmap. I saw it is planned to implement
LDA in the MLlib 1.1. What do you think about PLSA? 

I understand that LDA is more popular now, but recent research shows that
modifications of PLSA sometimes performs better[1]. Furthermore, the most
recent paper by same authors shows that there is a clear way to extend PLSA
to LDA and beyond[2]. We can implement PLSA with this modifications in
MLlib. Is it interesting?

Actually we already have implementation of Robust PLSA over Spark. So the
task is to integrate it into MLlib.

1. A. Potapenko, K. Vorontsov. 2013. Robust PLSA performs better than LDA.
In Proceedings of ECIR'13.
2. Vorontsov, Potapenko. Tutorial on Probabilistic Topic Modeling: Additive
Regularization for Stochastic Matrix Factorization.
http://www.machinelearning.ru/wiki/images/1/1f/Voron14aist.pdf 

Best regards,
Denis.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716p7844.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: rdd.cache() is not faster?

2014-06-18 Thread Wei Tan
Hi Gaurav, thanks for your pointer. The observation in the link is (at 
least qualitatively) similar to mine.

Now the question is, if I do have big data (40GB, cached size is 60GB) and 
even big memory (192 GB), I cannot benefit from RDD cache, and should 
persist on disk and leverage filesystem cache?

I will try more workers so that each JVM has a smaller heap.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Gaurav Jain ja...@student.ethz.ch
To: u...@spark.incubator.apache.org, 
Date:   06/18/2014 06:30 AM
Subject:Re: rdd.cache() is not faster?



You cannot assume that caching would always reduce the execution time,
especially if the data-set is large. It appears that if too much memory is
used for caching, then less memory is left for the actual computation
itself. There has to be a balance between the two. 

Page 33 of this thesis from KTH talks about this:
http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf

Best



-
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: rdd.cache() is not faster?

2014-06-18 Thread Gaurav Jain
 if I do have big data (40GB, cached size is 60GB) and even big memory (192
GB), I cannot benefit from RDD cache, and should persist on disk and
leverage filesystem cache?

The answer to the question of whether to persist (spill-over) data on disk
is not always immediately clear, because generally the functions to compute
RDD partitions are not as expensive as retrieving the saved partition from
disk. That's why, the default STORAGE_LEVEL never stores RDD partitions on
disk, and instead computes them on the fly. Also, you can try using Kryo
serialization (if not using it already) to reduce memory usage. Playing
around with different Storage levels (MEMORY_ONLY_SER, for example) might
also help. 

Best
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
Email: jaing at student dot ethz dot ch



-
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7846.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Wildcard support in input path

2014-06-18 Thread Nicholas Chammas
Is that month= syntax something special, or do your files actually have
that string as part of their name?
​


On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi all,

 Thanks for the reply. I'm using parquetFile as input, is that a problem?
 In hadoop fs -ls, the path (hdfs://domain/user/
 jianshuang/data/parquet/table/month=2014*) will get list all the files.

 I'll test it again.

 Jianshi


 On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Andrew,

 Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
 file not found. I'll try again.

 Jianshi


 On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com
 wrote:

 In Spark you can use the normal globs supported by Hadoop's FileSystem,
 which are documented here:
 http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)


 On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi Jianshi,

 I have used wild card characters (*) in my program and it worked..
 My code was like this
 b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*)

 Thanks  Regards,
 Meethu M


   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:


  It would be convenient if Spark's textFile, parquetFile, etc. can
 support path with wildcard, such as:

   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

  Or is there already a way to do it now?

 Jianshi

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: Wildcard support in input path

2014-06-18 Thread Jianshi Huang
Hi Nicholas,

month= is for Hive to auto discover the partitions. It's part of the url of
my files.

Jianshi


On Wed, Jun 18, 2014 at 11:52 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Is that month= syntax something special, or do your files actually have
 that string as part of their name?
 ​


 On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi all,

 Thanks for the reply. I'm using parquetFile as input, is that a problem?
 In hadoop fs -ls, the path (hdfs://domain/user/
 jianshuang/data/parquet/table/month=2014*) will get list all the files.

 I'll test it again.

 Jianshi


 On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Andrew,

 Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
 file not found. I'll try again.

 Jianshi


 On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com
 wrote:

 In Spark you can use the normal globs supported by Hadoop's FileSystem,
 which are documented here:
 http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)


 On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in
  wrote:

 Hi Jianshi,

 I have used wild card characters (*) in my program and it worked..
 My code was like this
 b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*)

 Thanks  Regards,
 Meethu M


   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:


  It would be convenient if Spark's textFile, parquetFile, etc. can
 support path with wildcard, such as:

   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

  Or is there already a way to do it now?

 Jianshi

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Wildcard support in input path

2014-06-18 Thread Nicholas Chammas
I wonder if that’s the problem. Is there an equivalent hadoop fs -ls
command you can run that returns the same files you want but doesn’t have
that month= string?
​


On Wed, Jun 18, 2014 at 12:25 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi Nicholas,

 month= is for Hive to auto discover the partitions. It's part of the url
 of my files.

 Jianshi


 On Wed, Jun 18, 2014 at 11:52 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Is that month= syntax something special, or do your files actually have
 that string as part of their name?
 ​


 On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi all,

 Thanks for the reply. I'm using parquetFile as input, is that a problem?
 In hadoop fs -ls, the path (hdfs://domain/user/
 jianshuang/data/parquet/table/month=2014*) will get list all the files.

 I'll test it again.

 Jianshi


 On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Andrew,

 Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it
 says file not found. I'll try again.

 Jianshi


 On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com
 wrote:

 In Spark you can use the normal globs supported by Hadoop's
 FileSystem, which are documented here:
 http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)


 On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW 
 meethu2...@yahoo.co.in wrote:

 Hi Jianshi,

 I have used wild card characters (*) in my program and it worked..
 My code was like this
 b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*)

 Thanks  Regards,
 Meethu M


   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:


  It would be convenient if Spark's textFile, parquetFile, etc. can
 support path with wildcard, such as:

   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

  Or is there already a way to do it now?

 Jianshi

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode

2014-06-18 Thread Andrew Lee
Hi All,
Have anyone ran into the same problem? By looking at the source code in 
official release (rc11),this property settings is set to false by default, 
however, I'm seeing the .sparkStaging folder remains on the HDFS and causing it 
to fill up the disk pretty fast since SparkContext deploys the fat JAR file 
(~115MB) every time for each job and it is not cleaned up.








yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:
  val preserveFiles = sparkConf.get(spark.yarn.preserve.staging.files, 
false).toBoolean
[test@spark ~]$ hdfs dfs -ls .sparkStagingFound 46 itemsdrwx--   - test 
users  0 2014-05-01 01:42 
.sparkStaging/application_1398370455828_0050drwx--   - test users  
0 2014-05-01 02:03 .sparkStaging/application_1398370455828_0051drwx--   - 
test users  0 2014-05-01 02:04 
.sparkStaging/application_1398370455828_0052drwx--   - test users  
0 2014-05-01 05:44 .sparkStaging/application_1398370455828_0053drwx--   - 
test users  0 2014-05-01 05:45 
.sparkStaging/application_1398370455828_0055drwx--   - test users  
0 2014-05-01 05:46 .sparkStaging/application_1398370455828_0056drwx--   - 
test users  0 2014-05-01 05:49 
.sparkStaging/application_1398370455828_0057drwx--   - test users  
0 2014-05-01 05:52 .sparkStaging/application_1398370455828_0058drwx--   - 
test users  0 2014-05-01 05:58 
.sparkStaging/application_1398370455828_0059drwx--   - test users  
0 2014-05-01 07:38 .sparkStaging/application_1398370455828_0060drwx--   - 
test users  0 2014-05-01 07:41 
.sparkStaging/application_1398370455828_0061….drwx--   - test users 
 0 2014-06-16 14:45 .sparkStaging/application_1402001910637_0131drwx--   - 
test users  0 2014-06-16 15:03 
.sparkStaging/application_1402001910637_0135drwx--   - test users  
0 2014-06-16 15:16 .sparkStaging/application_1402001910637_0136drwx--   - 
test users  0 2014-06-16 15:46 
.sparkStaging/application_1402001910637_0138drwx--   - test users  
0 2014-06-16 23:57 .sparkStaging/application_1402001910637_0157drwx--   - 
test users  0 2014-06-17 05:55 
.sparkStaging/application_1402001910637_0161
Is this something that needs to be explicitly set in 
:SPARK_YARN_USER_ENV=spark.yarn.preserve.staging.files=false
http://spark.apache.org/docs/latest/running-on-yarn.htmlspark.yarn.preserve.staging.filesfalseSet
 to true to preserve the staged files (Spark jar, app jar, distributed cache 
files) at the end of the job rather then delete them.or this is a bug that is 
not honoring the default value and is override to true somewhere?
Thanks.


  

RE: HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode

2014-06-18 Thread Andrew Lee
Forgot to mention that I am using spark-submit to submit jobs, and a verbose 
mode print out looks like this with the SparkPi examples.The .sparkStaging 
won't be deleted. My thoughts is that this should be part of the staging and 
should be cleaned up as well when sc gets terminated.









[test@ spark]$ SPARK_YARN_USER_ENV=spark.yarn.preserve.staging.files=false 
SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop2.2.0.jar 
./bin/spark-submit --verbose --master yarn --deploy-mode cluster --class 
org.apache.spark.examples.SparkPi --driver-memory 512M --driver-library-path 
/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar --executor-memory 512M 
--executor-cores 1 --queue research --num-executors 2 
examples/target/spark-examples_2.10-1.0.0.jar 
















Using properties file: null
Using properties file: null
Parsed arguments:
  master  yarn
  deployMode  cluster
  executorMemory  512M
  executorCores   1
  totalExecutorCores  null
  propertiesFile  null
  driverMemory512M
  driverCores null
  driverExtraClassPathnull
  driverExtraLibraryPath  /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar
  driverExtraJavaOptions  null
  supervise   false
  queue   research
  numExecutors2
  files   null
  pyFiles null
  archivesnull
  mainClass   org.apache.spark.examples.SparkPi
  primaryResource 
file:/opt/spark/examples/target/spark-examples_2.10-1.0.0.jar
  nameorg.apache.spark.examples.SparkPi
  childArgs   []
  jarsnull
  verbose true


Default properties from null:
  



Using properties file: null
Main class:
org.apache.spark.deploy.yarn.Client
Arguments:
--jar
file:/opt/spark/examples/target/spark-examples_2.10-1.0.0.jar
--class
org.apache.spark.examples.SparkPi
--name
org.apache.spark.examples.SparkPi
--driver-memory
512M
--queue
research
--num-executors
2
--executor-memory
512M
--executor-cores
1
System properties:
spark.driver.extraLibraryPath - 
/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar
SPARK_SUBMIT - true
spark.app.name - org.apache.spark.examples.SparkPi
Classpath elements:








From: alee...@hotmail.com
To: user@spark.apache.org
Subject: HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode
Date: Wed, 18 Jun 2014 11:05:12 -0700




Hi All,
Have anyone ran into the same problem? By looking at the source code in 
official release (rc11),this property settings is set to false by default, 
however, I'm seeing the .sparkStaging folder remains on the HDFS and causing it 
to fill up the disk pretty fast since SparkContext deploys the fat JAR file 
(~115MB) every time for each job and it is not cleaned up.








yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:
  val preserveFiles = sparkConf.get(spark.yarn.preserve.staging.files, 
false).toBoolean
[test@spark ~]$ hdfs dfs -ls .sparkStagingFound 46 itemsdrwx--   - test 
users  0 2014-05-01 01:42 
.sparkStaging/application_1398370455828_0050drwx--   - test users  
0 2014-05-01 02:03 .sparkStaging/application_1398370455828_0051drwx--   - 
test users  0 2014-05-01 02:04 
.sparkStaging/application_1398370455828_0052drwx--   - test users  
0 2014-05-01 05:44 .sparkStaging/application_1398370455828_0053drwx--   - 
test users  0 2014-05-01 05:45 
.sparkStaging/application_1398370455828_0055drwx--   - test users  
0 2014-05-01 05:46 .sparkStaging/application_1398370455828_0056drwx--   - 
test users  0 2014-05-01 05:49 
.sparkStaging/application_1398370455828_0057drwx--   - test users  
0 2014-05-01 05:52 .sparkStaging/application_1398370455828_0058drwx--   - 
test users  0 2014-05-01 05:58 
.sparkStaging/application_1398370455828_0059drwx--   - test users  
0 2014-05-01 07:38 .sparkStaging/application_1398370455828_0060drwx--   - 
test users  0 2014-05-01 07:41 
.sparkStaging/application_1398370455828_0061….drwx--   - test users 
 0 2014-06-16 14:45 .sparkStaging/application_1402001910637_0131drwx--   - 
test users  0 2014-06-16 15:03 
.sparkStaging/application_1402001910637_0135drwx--   - test users  
0 2014-06-16 15:16 .sparkStaging/application_1402001910637_0136drwx--   - 
test users  0 2014-06-16 15:46 
.sparkStaging/application_1402001910637_0138drwx--   - test users  
0 2014-06-16 23:57 .sparkStaging/application_1402001910637_0157drwx--   - 
test users  0 2014-06-17 05:55 
.sparkStaging/application_1402001910637_0161
Is this something that needs to be explicitly set in 
:SPARK_YARN_USER_ENV=spark.yarn.preserve.staging.files=false

Re: question about setting SPARK_CLASSPATH IN spark_env.sh

2014-06-18 Thread santhoma
by the way, any idea how to sync the spark config dir with other nodes in the
cluster?

~santhosh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809p7853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Unit test failure: Address already in use

2014-06-18 Thread Lisonbee, Todd

Disabling parallelExecution has worked for me.

Other alternatives I’ve tried that also work include:

1. Using a lock – this will let tests execute in parallel except for those 
using a SparkContext.  If you have a large number of tests that could execute 
in parallel, this can shave off some time.

object TestingSparkContext {
  val lock = new Lock()
}

// before you instantiate your local SparkContext
TestingSparkContext.lock.acquire()

// after you call sc.stop()
TestingSparkContext.lock.release()


2. Sharing a local SparkContext between tests.

- This is nice because your tests will run faster.  Start-up and shutdown is 
time consuming (can add a few seconds per test).

- The downside is that your tests are using the same SparkContext so they are 
less independent of each other.  I haven’t seen issues with this yet but there 
are likely some things that might crop up.

Best,

Todd


From: Anselme Vignon [mailto:anselme.vig...@flaminem.com]
Sent: Wednesday, June 18, 2014 12:33 AM
To: user@spark.apache.org
Subject: Re: Unit test failure: Address already in use

Hi,

Could your problem come from the fact that you run your tests in parallel ?

If you are spark in local mode, you cannot have concurrent spark instances 
running. this means that your tests instantiating sparkContext cannot be run in 
parallel. The easiest fix is to tell sbt to not run parallel tests.
This can be done by adding the following line in your build.sbt:

parallelExecution in Test := false

Cheers,

Anselme



2014-06-17 23:01 GMT+02:00 SK 
skrishna...@gmail.commailto:skrishna...@gmail.com:
Hi,

I have 3 unit tests (independent of each other) in the /src/test/scala
folder. When I run each of them individually using: sbt test-only test,
all the 3 pass the test. But when I run them all using sbt test, then they
fail with the warning below. I am wondering if the binding exception results
in failure to run the job, thereby causing the failure. If so, what can I do
to address this binding exception? I am running these tests locally on a
standalone machine (i.e. SparkContext(local, test)).


14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@3487b78dmailto:org.eclipse.jetty.server.Server@3487b78d:
 java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:174)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)


thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Unit test failure: Address already in use

2014-06-18 Thread Philip Ogren
In my unit tests I have a base class that all my tests extend that has a 
setup and teardown method that they inherit.  They look something like this:


var spark: SparkContext = _

@Before
def setUp() {
Thread.sleep(100L) //this seems to give spark more time to 
reset from the previous test's tearDown

spark = new SparkContext(local, test spark)
}

@After
def tearDown() {
spark.stop
spark = null //not sure why this helps but it does!
System.clearProperty(spark.master.port)
   }


It's been since last fall (i.e. version 0.8.x) since I've examined this 
code and so I can't vouch that it is still accurate/necessary - but it 
still works for me.



On 06/18/2014 12:59 PM, Lisonbee, Todd wrote:


Disabling parallelExecution has worked for me.

Other alternatives I’ve tried that also work include:

1. Using a lock – this will let tests execute in parallel except for 
those using a SparkContext.  If you have a large number of tests that 
could execute in parallel, this can shave off some time.


object TestingSparkContext {

val lock = new Lock()

}

// before you instantiate your local SparkContext

TestingSparkContext.lock.acquire()

// after you call sc.stop()

TestingSparkContext.lock.release()

2. Sharing a local SparkContext between tests.

- This is nice because your tests will run faster.  Start-up and 
shutdown is time consuming (can add a few seconds per test).


- The downside is that your tests are using the same SparkContext so 
they are less independent of each other.  I haven’t seen issues with 
this yet but there are likely some things that might crop up.


Best,

Todd

*From:*Anselme Vignon [mailto:anselme.vig...@flaminem.com]
*Sent:* Wednesday, June 18, 2014 12:33 AM
*To:* user@spark.apache.org
*Subject:* Re: Unit test failure: Address already in use

Hi,

Could your problem come from the fact that you run your tests in 
parallel ?


If you are spark in local mode, you cannot have concurrent spark 
instances running. this means that your tests instantiating 
sparkContext cannot be run in parallel. The easiest fix is to tell sbt 
to not run parallel tests.


This can be done by adding the following line in your build.sbt:

parallelExecution in Test := false

Cheers,

Anselme

2014-06-17 23:01 GMT+02:00 SK skrishna...@gmail.com 
mailto:skrishna...@gmail.com:


Hi,

I have 3 unit tests (independent of each other) in the /src/test/scala
folder. When I run each of them individually using: sbt test-only
test,
all the 3 pass the test. But when I run them all using sbt test,
then they
fail with the warning below. I am wondering if the binding
exception results
in failure to run the job, thereby causing the failure. If so,
what can I do
to address this binding exception? I am running these tests
locally on a
standalone machine (i.e. SparkContext(local, test)).


14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@3487b78d
mailto:org.eclipse.jetty.server.Server@3487b78d:
java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:174)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
at
sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)


thanks



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.





Spark is now available via Homebrew

2014-06-18 Thread Nick Chammas
OS X / Homebrew users,

It looks like you can now download Spark simply by doing:

brew install apache-spark

I’m new to Homebrew, so I’m not too sure how people are intended to use
this. I’m guessing this would just be a convenient way to get the latest
release onto your workstation, and from there use spark-ec2 to launch
clusters.

Anyway, just a cool thing to point out.

Nick
​




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: No Intercept for Python

2014-06-18 Thread Reza Zadeh
Hi Naftali,

Yes you're right. For now please add a column of ones. We are working on
adding a weighted regularization term, and exposing the scala intercept
option in the python binding.

Best,
Reza


On Mon, Jun 16, 2014 at 12:19 PM, Naftali Harris naft...@affirm.com wrote:

 Hi everyone,

 The Python LogisticRegressionWithSGD does not appear to estimate an
 intercept.  When I run the following, the returned weights and intercept
 are both 0.0:

 from pyspark import SparkContext
 from pyspark.mllib.regression import LabeledPoint
 from pyspark.mllib.classification import LogisticRegressionWithSGD

 def main():
 sc = SparkContext(appName=NoIntercept)

 train = sc.parallelize([LabeledPoint(0, [0]), LabeledPoint(1, [0]),
 LabeledPoint(1, [0])])

 model = LogisticRegressionWithSGD.train(train, iterations=500,
 step=0.1)
 print Final weights:  + str(model.weights)
 print Final intercept:  + str(model.intercept)

 if __name__ == __main__:
 main()


 Of course, one can fit an intercept with the simple expedient of adding a
 column of ones, but that's kind of annoying.  Moreover, it looks like the
 scala version has an intercept option.

 Am I missing something? Should I just add the column of ones? If I
 submitted a PR doing that, is that the sort of thing you guys would accept?

 Thanks! :-)

 Naftali



Re: Spark is now available via Homebrew

2014-06-18 Thread Matei Zaharia
Interesting, does anyone know the people over there who set it up? It would be 
good if Apache itself could publish packages there, though I’m not sure what’s 
involved. Since Spark just depends on Java and Python it should be easy for us 
to update.

Matei

On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com wrote:

 OS X / Homebrew users,
 
 It looks like you can now download Spark simply by doing:
 
 brew install apache-spark
 I’m new to Homebrew, so I’m not too sure how people are intended to use this. 
 I’m guessing this would just be a convenient way to get the latest release 
 onto your workstation, and from there use spark-ec2 to launch clusters.
 
 Anyway, just a cool thing to point out.
 
 Nick
 
 ​
 
 View this message in context: Spark is now available via Homebrew
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: No Intercept for Python

2014-06-18 Thread Naftali Harris
Thanks Reza! :-D

Naftali


On Wed, Jun 18, 2014 at 1:47 PM, Reza Zadeh r...@databricks.com wrote:

 Hi Naftali,

 Yes you're right. For now please add a column of ones. We are working on
 adding a weighted regularization term, and exposing the scala intercept
 option in the python binding.

 Best,
 Reza


 On Mon, Jun 16, 2014 at 12:19 PM, Naftali Harris naft...@affirm.com
 wrote:

 Hi everyone,

 The Python LogisticRegressionWithSGD does not appear to estimate an
 intercept.  When I run the following, the returned weights and intercept
 are both 0.0:

 from pyspark import SparkContext
 from pyspark.mllib.regression import LabeledPoint
 from pyspark.mllib.classification import LogisticRegressionWithSGD

 def main():
 sc = SparkContext(appName=NoIntercept)

 train = sc.parallelize([LabeledPoint(0, [0]), LabeledPoint(1, [0]),
 LabeledPoint(1, [0])])

 model = LogisticRegressionWithSGD.train(train, iterations=500,
 step=0.1)
 print Final weights:  + str(model.weights)
 print Final intercept:  + str(model.intercept)

 if __name__ == __main__:
 main()


 Of course, one can fit an intercept with the simple expedient of adding a
 column of ones, but that's kind of annoying.  Moreover, it looks like the
 scala version has an intercept option.

 Am I missing something? Should I just add the column of ones? If I
 submitted a PR doing that, is that the sort of thing you guys would accept?

 Thanks! :-)

 Naftali





Re: Spark is now available via Homebrew

2014-06-18 Thread Sheryl John
Cool.
Looked at the Pull Requests, the upgrade to 1.0.0 was just merged
yesterday. https://github.com/Homebrew/homebrew/pull/30231

https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb


On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Interesting, does anyone know the people over there who set it up? It
 would be good if Apache itself could publish packages there, though I'm not
 sure what's involved. Since Spark just depends on Java and Python it should
 be easy for us to update.

 Matei

 On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 OS X / Homebrew users,

 It looks like you can now download Spark simply by doing:

 brew install apache-spark

 I'm new to Homebrew, so I'm not too sure how people are intended to use
 this. I'm guessing this would just be a convenient way to get the latest
 release onto your workstation, and from there use spark-ec2 to launch
 clusters.

 Anyway, just a cool thing to point out.

 Nick

 --
 View this message in context: Spark is now available via Homebrew
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





-- 
-Sheryl


Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-18 Thread Shivani Rao
I am trying to process a file that contains 4 log lines (not very long) and
then write my parsed out case classes to a destination folder, and I get
the following error:


java.lang.OutOfMemoryError: Java heap space

at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)

at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)

at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)

at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)

at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)

at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)

at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)

at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)


Sadly, there are several folks that have faced this error while trying to
execute Spark jobs and there are various solutions, none of which work for
me


a) I tried (
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-td7735.html#a7736)
changing the number of partitions in my RDD by using coalesce(8) and the
error persisted

b)  I tried changing SPARK_WORKER_MEM=2g, SPARK_EXECUTOR_MEMORY=10g, and
both did not work

c) I strongly suspect there is a class path error (
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-td4719.html)
Mainly because the call stack is repetitive. Maybe the OOM error is a
disguise ?

d) I checked that i am not out of disk space and that i do not have too
many open files (ulimit -u  sudo ls /proc/spark_master_process_id/fd |
wc -l)


I am also noticing multiple reflections happening to find the right class
i guess, so it could be class Not Found: error disguising itself as a
memory error.


Here are other threads that are encountering same situation .. but have not
been resolved in any way so far..


http://apache-spark-user-list.1001560.n3.nabble.com/no-response-in-spark-web-UI-td4633.html

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-td4268.html


Any help is greatly appreciated. I am especially calling out on creators of
Spark and Databrick folks. This seems like a known bug waiting to happen.


Thanks,

Shivani

-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: Spark is now available via Homebrew

2014-06-18 Thread Nicholas Chammas
Agreed, it would be better if Apache controlled or managed this directly.

I think making such a change is just a matter of opening a new issue
https://github.com/Homebrew/homebrew/issues/new on the Homebrew issue
tracker. I believe that's how Spark made it in there in the first place--it
was just a user contribution.

Nick


On Wed, Jun 18, 2014 at 4:57 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Interesting, does anyone know the people over there who set it up? It
 would be good if Apache itself could publish packages there, though I’m not
 sure what’s involved. Since Spark just depends on Java and Python it should
 be easy for us to update.

 Matei

 On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 OS X / Homebrew users,

 It looks like you can now download Spark simply by doing:

 brew install apache-spark

 I’m new to Homebrew, so I’m not too sure how people are intended to use
 this. I’m guessing this would just be a convenient way to get the latest
 release onto your workstation, and from there use spark-ec2 to launch
 clusters.

 Anyway, just a cool thing to point out.

 Nick
 ​

 --
 View this message in context: Spark is now available via Homebrew
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Re: Spark is now available via Homebrew

2014-06-18 Thread Nicholas Chammas
Matei,

You might want to comment on that issue Sherl linked to, or perhaps this one
https://github.com/Homebrew/homebrew/issues/30228, to ask about how
Apache can manage this going forward. I know that mikemcquaid
https://github.com/mikemcquaid is very active on the Homebrew repo and is
one of the maintainers.

Nick


On Wed, Jun 18, 2014 at 5:17 PM, Sheryl John shery...@gmail.com wrote:

 Cool.
 Looked at the Pull Requests, the upgrade to 1.0.0 was just merged
 yesterday. https://github.com/Homebrew/homebrew/pull/30231


 https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb


 On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Interesting, does anyone know the people over there who set it up? It
 would be good if Apache itself could publish packages there, though I’m not
 sure what’s involved. Since Spark just depends on Java and Python it should
 be easy for us to update.

 Matei

 On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 OS X / Homebrew users,

 It looks like you can now download Spark simply by doing:

 brew install apache-spark

 I’m new to Homebrew, so I’m not too sure how people are intended to use
 this. I’m guessing this would just be a convenient way to get the latest
 release onto your workstation, and from there use spark-ec2 to launch
 clusters.

 Anyway, just a cool thing to point out.

 Nick

 --
 View this message in context: Spark is now available via Homebrew
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





 --
 -Sheryl



Re: Spark is now available via Homebrew

2014-06-18 Thread Andrew Ash
What's the advantage of Apache maintaining the brew installer vs users?

Apache handling it means more work on this dev team, but probably a better
experience for brew users.  Just wanted to weigh pros/cons before
committing to support this installation method.

Andrew


On Wed, Jun 18, 2014 at 5:29 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Matei,

 You might want to comment on that issue Sherl linked to, or perhaps this
 one https://github.com/Homebrew/homebrew/issues/30228, to ask about how
 Apache can manage this going forward. I know that mikemcquaid
 https://github.com/mikemcquaid is very active on the Homebrew repo and
 is one of the maintainers.

 Nick


 On Wed, Jun 18, 2014 at 5:17 PM, Sheryl John shery...@gmail.com wrote:

 Cool.
 Looked at the Pull Requests, the upgrade to 1.0.0 was just merged
 yesterday. https://github.com/Homebrew/homebrew/pull/30231


 https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb


 On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Interesting, does anyone know the people over there who set it up? It
 would be good if Apache itself could publish packages there, though I’m not
 sure what’s involved. Since Spark just depends on Java and Python it should
 be easy for us to update.

 Matei

 On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 OS X / Homebrew users,

 It looks like you can now download Spark simply by doing:

 brew install apache-spark

 I’m new to Homebrew, so I’m not too sure how people are intended to use
 this. I’m guessing this would just be a convenient way to get the latest
 release onto your workstation, and from there use spark-ec2 to launch
 clusters.

 Anyway, just a cool thing to point out.

 Nick

 --
 View this message in context: Spark is now available via Homebrew
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





 --
 -Sheryl





Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-18 Thread Andrew Ash
Wait, so the file only has four lines and the job running out of heap
space?  Can you share the code you're running that does the processing?
 I'd guess that you're doing some intense processing on every line but just
writing parsed case classes back to disk sounds very lightweight.

I


On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com wrote:

 I am trying to process a file that contains 4 log lines (not very long)
 and then write my parsed out case classes to a destination folder, and I
 get the following error:


 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)

 at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)

 at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)

 at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)

 at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

 at java.lang.reflect.Method.invoke(Method.java:597)

 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)

 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)

 at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)

 at
 org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

 at java.lang.reflect.Method.invoke(Method.java:597)

 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)


 Sadly, there are several folks that have faced this error while trying to
 execute Spark jobs and there are various solutions, none of which work for
 me


 a) I tried (
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-td7735.html#a7736)
 changing the number of partitions in my RDD by using coalesce(8) and the
 error persisted

 b)  I tried changing SPARK_WORKER_MEM=2g, SPARK_EXECUTOR_MEMORY=10g, and
 both did not work

 c) I strongly suspect there is a class path error (
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-td4719.html)
 Mainly because the call stack is repetitive. Maybe the OOM error is a
 disguise ?

 d) I checked that i am not out of disk space and that i do not have too
 many open files (ulimit -u  sudo ls /proc/spark_master_process_id/fd |
 wc -l)


 I am also noticing multiple reflections happening to find the right
 class i guess, so it could be class Not Found: error disguising itself
 as a memory error.


 Here are other threads that are encountering same situation .. but have
 not been resolved in any way so far..



 http://apache-spark-user-list.1001560.n3.nabble.com/no-response-in-spark-web-UI-td4633.html


 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-td4268.html


 Any help is greatly appreciated. I am especially calling out on creators
 of Spark and Databrick folks. This seems like a known bug waiting to
 happen.


 Thanks,

 Shivani

 --
 Software Engineer
 Analytics Engineering Team@ Box
 Mountain View, CA



java.lang.OutOfMemoryError with saveAsTextFile

2014-06-18 Thread Muttineni, Vinay
Hi,
I have a 5 million record, 300 column data set.
I am running a spark job in yarn-cluster mode, with the following args
--driver-memory 11G --executor-memory 11G --executor-cores 16  --num-executors 
500
The spark job replaces all categorical variables with some integers.
I am getting the below error when I try to save the transformed data set.

java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit 
exceeded)
java.util.Arrays.copyOfRange(Arrays.java:3209)
java.lang.String.init(String.java:215)
java.lang.StringBuilder.toString(StringBuilder.java:430)
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023)
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2819)
java.io.ObjectInputStream.readString(ObjectInputStream.java:1598)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
scala.collection.AbstractIterator.toList(Iterator.scala:1157)
scala.collection.immutable.List.$plus$plus(List.scala:193)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)

The code is as follows:
  val transformedData = splitFileWithHeader.flatMap(rowArray = {
  try {
if (rowArray.sameElements(header.value)) {
  None
} else {
  val transformedArray: Array[String] = new 
Array[String](rowArray.length)
  for (i - 0 until rowArray.length) {
//  Check 1 to see if the value should be replaced, Check 2 
to see if its a null value in which case, we do not update the value
if (broadcastReplacements.value(i) != null  
rowArray(i).trim.toString != ) {
  transformedArray.update(i, 
broadcastReplacements.value(i)(rowArray(i).trim.toString).toString)
} else {
  transformedArray.update(i, rowArray(i).trim.toString)
}
  }
  Array(transformedArray.deep.mkString(,))
}
  }
  catch
{

  case _: Throwable = {
println(Failure in transforming the file, 1 line, Around Line 131)
None
  }

}

}).coalesce(1, true).mapPartitions( it = (Seq(headerLine.value) ++ 
it).iterator,true).coalesce(500)

//Save the Transformed Data File
transformedData.saveAsTextFile(outputFileLocation)


Any idea how I can resolve this error?
Previous stages have completed successfully.
Thank You!
Vinay



Prior Stages

   val dataFile = sc.textFile(args(1),500)
//Get the first line which is the header, which would also contain the 
column type
val columnDefinition = dataFile.first
val headerLine = sc.broadcast(columnDefinition)
val header = sc.broadcast(columnDefinition.split(,,-1))
//  Remove the Header
val modifiedDataFile = dataFile.filter(line = line != headerLine.value)
val onlySplitFile = modifiedDataFile.flatMap(line =
  {
try {
  //println(line.split(' ').length)
  //println(line.split(' '))
  if (line.split(',').length  1 || 
line.split(',').sameElements(Array())) {
None
  } else {
Array(line.split(,,-1))
  }

} catch {
  case _: Throwable = None
}
  })
modifiedDataFile.unpersist(true)

val currentColumn = sc.broadcast(i)
val distinctValues = onlySplitFile.flatMap(rowArray =
  {
try {
  

Spark streaming and rate limit

2014-06-18 Thread Flavio Pompermaier
Hi to all,
in my use case I'd like to receive events and call an external service as
they pass through. Is it possible to limit the number of contemporaneous
call to that service (to avoid DoS) using Spark streaming? if so, limiting
the rate implies a possible buffer growth...how can I control the buffer of
incoming events waiting to be processed?

Best,
Flavio


Re: Issue while trying to aggregate with a sliding window

2014-06-18 Thread Hatch M
Ok that patch does fix the key lookup exception. However, curious about the
time validity check..isValidTime (
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264
)

Why does (time - zerotime) have to be a multiple of slide duration ?
Shouldn't the reduceByKeyAndWindow aggregate every record in a given window
(zeroTime to zeroTime+windowDuration)?


On Tue, Jun 17, 2014 at 10:55 PM, Hatch M hatchman1...@gmail.com wrote:

 Thanks! Will try to get the fix and retest.


 On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote:

 There is a bug:

 https://github.com/apache/spark/pull/961#issuecomment-45125185


 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote:
  Trying to aggregate over a sliding window, playing with the slide
 duration.
  Playing around with the slide interval I can see the aggregation works
 but
  mostly fails with the below error. The stream has records coming in at
  100ms.
 
  JavaPairDStreamString, AggregateObject aggregatedDStream =
  pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
  Duration(60));
 
  14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
  invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms
 and
  difference is 1100 ms
  14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
  1403050486900 ms
  java.util.NoSuchElementException: key not found: 1403050486900 ms
  at scala.collection.MapLike$class.default(MapLike.scala:228)
 
  Any hints on whats going on here?
  Thanks!
  Hatch
 





Re: Spark streaming and rate limit

2014-06-18 Thread Soumya Simanta

You can add a back pressured enabled component in front that feeds data into 
Spark. This component can control in input rate to spark. 

 On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier pomperma...@okkam.it wrote:
 
 Hi to all,
 in my use case I'd like to receive events and call an external service as 
 they pass through. Is it possible to limit the number of contemporaneous call 
 to that service (to avoid DoS) using Spark streaming? if so, limiting the 
 rate implies a possible buffer growth...how can I control the buffer of 
 incoming events waiting to be processed?
 
 Best,
 Flavio 


Re: Spark streaming and rate limit

2014-06-18 Thread Flavio Pompermaier
Thanks for the quick reply soumya. Unfortunately I'm a newbie with
Spark..what do you mean? is there any reference to how to do that?

On Thu, Jun 19, 2014 at 12:24 AM, Soumya Simanta soumya.sima...@gmail.com
wrote:


 You can add a back pressured enabled component in front that feeds data
 into Spark. This component can control in input rate to spark.

  On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier pomperma...@okkam.it
 wrote:
 
  Hi to all,
  in my use case I'd like to receive events and call an external service
 as they pass through. Is it possible to limit the number of contemporaneous
 call to that service (to avoid DoS) using Spark streaming? if so, limiting
 the rate implies a possible buffer growth...how can I control the buffer of
 incoming events waiting to be processed?
 
  Best,
  Flavio



create SparkContext dynamically

2014-06-18 Thread jamborta
Hi all,

I am setting up a system where spark contexts would be created by a web
server that would handle the computation and return the results. I have the
following code (in python)

os.environ['SPARK_HOME'] = /home/spark/spark-1.0.0-bin-hadoop2/
sc = SparkContext(master=spark://ip-xx-xx-xx-xx:7077, appName=Simple
App)
l =sc.parallelize([1,2,3,4])
c = l.count() 

but it throws an unrelated error 'TypeError: an integer is required' in the
last line.

I assume I did not setup the environment properly. I have added spark_home
and py4j source to the classpath. not sure what is missing.

thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/create-SparkContext-dynamically-tp7872.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Trailing Tasks Saving to HDFS

2014-06-18 Thread Surendranauth Hiraman
I have a flow that ends with saveAsTextFile() to HDFS.

It seems all the expected files per partition have been written out, based
on the number of part files and the file sizes.

But the driver logs show 2 tasks still not completed and has no activity
and the worker logs show no activity for those two tasks for a while now.

Has anyone run into this situation? It's happened to me a couple of times
now.

Thanks.

-- Suren

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nick Chammas
The following is a simplified example of what I am trying to accomplish.

Say I have an RDD of objects like this:

{
country: USA,
name: Franklin,
age: 24,
hits: 224}
{

country: USA,
name: Bob,
age: 55,
hits: 108}
{

country: France,
name: Remi,
age: 33,
hits: 72}

I want to find the average age and total number of hits per country.
Ideally, I would like to scan the data once and perform both aggregations
simultaneously.

What is a good approach to doing this?

I’m thinking that we’d want to keyBy(country), and then somehow
reduceByKey(). The problem is, I don’t know how to approach writing a
function that can be passed to reduceByKey() and that will track a running
average and total simultaneously.

Nick
​




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Doris Xin
Hi Nick,

Instead of using reduceByKey(), you might want to look into using
aggregateByKey(), which allows you to return a different value type U
instead of the input value type V for each input tuple (K, V). You can
define U to be a datatype that holds both the average and total and have
seqOp update both fields of U in a single pass.

Hope this makes sense,
Doris


On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com
wrote:

 The following is a simplified example of what I am trying to accomplish.

 Say I have an RDD of objects like this:

 {
 country: USA,
 name: Franklin,
 age: 24,
 hits: 224}
 {

 country: USA,
 name: Bob,
 age: 55,
 hits: 108}
 {

 country: France,
 name: Remi,
 age: 33,
 hits: 72}

 I want to find the average age and total number of hits per country.
 Ideally, I would like to scan the data once and perform both aggregations
 simultaneously.

 What is a good approach to doing this?

 I’m thinking that we’d want to keyBy(country), and then somehow
 reduceByKey(). The problem is, I don’t know how to approach writing a
 function that can be passed to reduceByKey() and that will track a
 running average and total simultaneously.

 Nick
 ​

 --
 View this message in context: Patterns for making multiple aggregations
 in one pass
 http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nicholas Chammas
Ah, this looks like exactly what I need! It looks like this was recently added
into PySpark https://github.com/apache/spark/pull/705/files#diff-6 (and
Spark Core), but it's not in the 1.0.0 release.

Thank you.

Nick


On Wed, Jun 18, 2014 at 7:42 PM, Doris Xin doris.s@gmail.com wrote:

 Hi Nick,

 Instead of using reduceByKey(), you might want to look into using
 aggregateByKey(), which allows you to return a different value type U
 instead of the input value type V for each input tuple (K, V). You can
 define U to be a datatype that holds both the average and total and have
 seqOp update both fields of U in a single pass.

 Hope this makes sense,
 Doris


 On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 The following is a simplified example of what I am trying to accomplish.

 Say I have an RDD of objects like this:

 {
 country: USA,
 name: Franklin,
 age: 24,
 hits: 224}
 {

 country: USA,
 name: Bob,
 age: 55,
 hits: 108}
 {

 country: France,
 name: Remi,
 age: 33,
 hits: 72}

 I want to find the average age and total number of hits per country.
 Ideally, I would like to scan the data once and perform both aggregations
 simultaneously.

 What is a good approach to doing this?

 I’m thinking that we’d want to keyBy(country), and then somehow
 reduceByKey(). The problem is, I don’t know how to approach writing a
 function that can be passed to reduceByKey() and that will track a
 running average and total simultaneously.

 Nick
 ​

 --
 View this message in context: Patterns for making multiple aggregations
 in one pass
 http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Evan R. Sparks
This looks like a job for SparkSQL!


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class MyRecord(country: String, name: String, age: Int, hits: Long)
val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234),
MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72)))
data.registerAsTable(MyRecords)
val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM
MyRecords t GROUP BY t.country).collect

Now results contains:

Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])


On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote:

 Hi Nick,

 Instead of using reduceByKey(), you might want to look into using
 aggregateByKey(), which allows you to return a different value type U
 instead of the input value type V for each input tuple (K, V). You can
 define U to be a datatype that holds both the average and total and have
 seqOp update both fields of U in a single pass.

 Hope this makes sense,
 Doris


 On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 The following is a simplified example of what I am trying to accomplish.

 Say I have an RDD of objects like this:

 {
 country: USA,
 name: Franklin,
 age: 24,
 hits: 224}
 {

 country: USA,
 name: Bob,
 age: 55,
 hits: 108}
 {

 country: France,
 name: Remi,
 age: 33,
 hits: 72}

 I want to find the average age and total number of hits per country.
 Ideally, I would like to scan the data once and perform both aggregations
 simultaneously.

 What is a good approach to doing this?

 I’m thinking that we’d want to keyBy(country), and then somehow
 reduceByKey(). The problem is, I don’t know how to approach writing a
 function that can be passed to reduceByKey() and that will track a
 running average and total simultaneously.

 Nick
 ​

 --
 View this message in context: Patterns for making multiple aggregations
 in one pass
 http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Matei Zaharia
I was going to suggest the same thing :).

On Jun 18, 2014, at 4:56 PM, Evan R. Sparks evan.spa...@gmail.com wrote:

 This looks like a job for SparkSQL!
 
 
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 case class MyRecord(country: String, name: String, age: Int, hits: Long)
 val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234), 
 MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72)))
 data.registerAsTable(MyRecords)
 val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords 
 t GROUP BY t.country).collect
 
 Now results contains:
 Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])
 
 
 
 On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote:
 Hi Nick,
 
 Instead of using reduceByKey(), you might want to look into using 
 aggregateByKey(), which allows you to return a different value type U instead 
 of the input value type V for each input tuple (K, V). You can define U to be 
 a datatype that holds both the average and total and have seqOp update both 
 fields of U in a single pass.
 
 Hope this makes sense,
 Doris
 
 
 On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com 
 wrote:
 The following is a simplified example of what I am trying to accomplish.
 
 Say I have an RDD of objects like this:
 
 {
 country: USA,
 name: Franklin,
 age: 24,
 hits: 224
 }
 {
 
 country: USA,
 name: Bob,
 age: 55,
 hits: 108
 }
 {
 
 country: France,
 name: Remi,
 age: 33,
 hits: 72
 }
 I want to find the average age and total number of hits per country. Ideally, 
 I would like to scan the data once and perform both aggregations 
 simultaneously.
 
 What is a good approach to doing this?
 
 I’m thinking that we’d want to keyBy(country), and then somehow 
 reduceByKey(). The problem is, I don’t know how to approach writing a 
 function that can be passed to reduceByKey() and that will track a running 
 average and total simultaneously.
 
 Nick
 
 ​
 
 View this message in context: Patterns for making multiple aggregations in 
 one pass
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 



Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nicholas Chammas
That’s pretty neat! So I guess if you start with an RDD of objects, you’d
first do something like RDD.map(lambda x: Record(x['field_1'],
x['field_2'], ...)) in order to register it as a table, and from there run
your aggregates. Very nice.
​


On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks evan.spa...@gmail.com
wrote:

 This looks like a job for SparkSQL!


 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 case class MyRecord(country: String, name: String, age: Int, hits: Long)
 val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234),
 MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72)))
 data.registerAsTable(MyRecords)
 val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM
 MyRecords t GROUP BY t.country).collect

 Now results contains:

 Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])


 On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote:

 Hi Nick,

 Instead of using reduceByKey(), you might want to look into using
 aggregateByKey(), which allows you to return a different value type U
 instead of the input value type V for each input tuple (K, V). You can
 define U to be a datatype that holds both the average and total and have
 seqOp update both fields of U in a single pass.

 Hope this makes sense,
 Doris


 On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com
  wrote:

 The following is a simplified example of what I am trying to accomplish.

 Say I have an RDD of objects like this:

 {
 country: USA,
 name: Franklin,
 age: 24,
 hits: 224}
 {

 country: USA,
 name: Bob,
 age: 55,
 hits: 108}
 {

 country: France,
 name: Remi,
 age: 33,
 hits: 72}

 I want to find the average age and total number of hits per country.
 Ideally, I would like to scan the data once and perform both aggregations
 simultaneously.

 What is a good approach to doing this?

 I’m thinking that we’d want to keyBy(country), and then somehow
 reduceByKey(). The problem is, I don’t know how to approach writing a
 function that can be passed to reduceByKey() and that will track a
 running average and total simultaneously.

 Nick
 ​

 --
 View this message in context: Patterns for making multiple aggregations
 in one pass
 http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.






Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Zongheng Yang
If your input data is JSON, you can also try out the recently merged
in initial JSON support:
https://github.com/apache/spark/commit/d2f4f30b12f99358953e2781957468e2cfe3c916

On Wed, Jun 18, 2014 at 5:27 PM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 That’s pretty neat! So I guess if you start with an RDD of objects, you’d
 first do something like RDD.map(lambda x: Record(x['field_1'], x['field_2'],
 ...)) in order to register it as a table, and from there run your
 aggregates. Very nice.



 On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks evan.spa...@gmail.com
 wrote:

 This looks like a job for SparkSQL!


 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 case class MyRecord(country: String, name: String, age: Int, hits: Long)
 val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234),
 MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72)))
 data.registerAsTable(MyRecords)
 val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM
 MyRecords t GROUP BY t.country).collect

 Now results contains:

 Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])



 On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote:

 Hi Nick,

 Instead of using reduceByKey(), you might want to look into using
 aggregateByKey(), which allows you to return a different value type U
 instead of the input value type V for each input tuple (K, V). You can
 define U to be a datatype that holds both the average and total and have
 seqOp update both fields of U in a single pass.

 Hope this makes sense,
 Doris


 On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas
 nicholas.cham...@gmail.com wrote:

 The following is a simplified example of what I am trying to accomplish.

 Say I have an RDD of objects like this:

 {
 country: USA,
 name: Franklin,
 age: 24,
 hits: 224
 }
 {

 country: USA,
 name: Bob,
 age: 55,
 hits: 108
 }
 {

 country: France,
 name: Remi,
 age: 33,
 hits: 72
 }

 I want to find the average age and total number of hits per country.
 Ideally, I would like to scan the data once and perform both aggregations
 simultaneously.

 What is a good approach to doing this?

 I’m thinking that we’d want to keyBy(country), and then somehow
 reduceByKey(). The problem is, I don’t know how to approach writing a
 function that can be passed to reduceByKey() and that will track a running
 average and total simultaneously.

 Nick


 
 View this message in context: Patterns for making multiple aggregations
 in one pass
 Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: Trailing Tasks Saving to HDFS

2014-06-18 Thread Surendranauth Hiraman
Looks like eventually there was some type of reset or timeout and the tasks
have been reassigned. I'm guessing they'll keep failing until max failure
count.

The machine it disconnected from was a remote machine, though I've seen
such failures from connections to itself with other problems. The log lines
from the remote machine are also below.

Any thoughts or guesses would be appreciated!

*HUNG WORKER*

14/06/18 19:41:18 WARN network.ReceivingConnection: Error reading from
connection to ConnectionManagerId(172.16.25.103,57626)

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcher.read0(Native Method)

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)

at sun.nio.ch.IOUtil.read(IOUtil.java:224)

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)

at org.apache.spark.network.ReceivingConnection.read(Connection.scala:496)

at
org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:679)

14/06/18 19:41:18 INFO network.ConnectionManager: Handling connection error
on connection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
SendingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found


*REMOTE WORKER*

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.124,55610)

14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found



On Wed, Jun 18, 2014 at 7:16 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I have a flow that ends with saveAsTextFile() to HDFS.

 It seems all the expected files per partition have been written out, based
 on the number of part files and the file sizes.

 But the driver logs show 2 tasks still not completed and has no activity
 and the worker logs show no activity for those two tasks for a while now.

 Has anyone run into this situation? It's happened to me a couple of times
 now.

 Thanks.

 -- Suren

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nicholas Chammas
This is exciting! Here is the relevant alpha doc
http://yhuai.github.io/site/sql-programming-guide.html#json-datasets for
this feature, for others reading this. I'm going to try this out.

Will this be released with 1.1.0?


On Wed, Jun 18, 2014 at 8:31 PM, Zongheng Yang zonghen...@gmail.com wrote:

 If your input data is JSON, you can also try out the recently merged
 in initial JSON support:

 https://github.com/apache/spark/commit/d2f4f30b12f99358953e2781957468e2cfe3c916

 On Wed, Jun 18, 2014 at 5:27 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  That’s pretty neat! So I guess if you start with an RDD of objects, you’d
  first do something like RDD.map(lambda x: Record(x['field_1'],
 x['field_2'],
  ...)) in order to register it as a table, and from there run your
  aggregates. Very nice.
 
 
 
  On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks evan.spa...@gmail.com
  wrote:
 
  This looks like a job for SparkSQL!
 
 
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  import sqlContext._
  case class MyRecord(country: String, name: String, age: Int, hits: Long)
  val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234),
  MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72)))
  data.registerAsTable(MyRecords)
  val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM
  MyRecords t GROUP BY t.country).collect
 
  Now results contains:
 
  Array[org.apache.spark.sql.Row] = Array([France,33.0,72],
 [USA,39.5,342])
 
 
 
  On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com
 wrote:
 
  Hi Nick,
 
  Instead of using reduceByKey(), you might want to look into using
  aggregateByKey(), which allows you to return a different value type U
  instead of the input value type V for each input tuple (K, V). You can
  define U to be a datatype that holds both the average and total and
 have
  seqOp update both fields of U in a single pass.
 
  Hope this makes sense,
  Doris
 
 
  On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas
  nicholas.cham...@gmail.com wrote:
 
  The following is a simplified example of what I am trying to
 accomplish.
 
  Say I have an RDD of objects like this:
 
  {
  country: USA,
  name: Franklin,
  age: 24,
  hits: 224
  }
  {
 
  country: USA,
  name: Bob,
  age: 55,
  hits: 108
  }
  {
 
  country: France,
  name: Remi,
  age: 33,
  hits: 72
  }
 
  I want to find the average age and total number of hits per country.
  Ideally, I would like to scan the data once and perform both
 aggregations
  simultaneously.
 
  What is a good approach to doing this?
 
  I’m thinking that we’d want to keyBy(country), and then somehow
  reduceByKey(). The problem is, I don’t know how to approach writing a
  function that can be passed to reduceByKey() and that will track a
 running
  average and total simultaneously.
 
  Nick
 
 
  
  View this message in context: Patterns for making multiple
 aggregations
  in one pass
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
 
 
 



Re: Spark streaming and rate limit

2014-06-18 Thread Soumya Simanta
Flavio - i'm new to Spark as well but I've done stream processing using
other frameworks. My comments below are not spark-streaming specific. Maybe
someone who know more can provide better insights.

I read your post on my phone and I believe my answer doesn't completely
address the issue you have raised.

Do you need to call the external service for every event ? i.e., do you
need to process all events ? Also does order of processing events matter?
Is there is time bound in which each event should be processed ?

Calling an external service means network IO. So you have to buffer events
if your service is rate limited or slower than rate at which you are
processing your event.

Here are some ways of dealing with this situation:

1. Drop events based on a policy (such as buffer/queue size),
2. Tell the event producer to slow down if that's in your control
3. Use a proxy or a set of proxies to distribute the calls to the remote
service, if the rate limit is by user or network node only.

I'm not sure how many of these are implemented directly in Spark streaming
but you can have an external component that can :
control the rate of event and only send events to Spark streams when it's
ready to process more messages.

Hope this helps.

-Soumya




On Wed, Jun 18, 2014 at 6:50 PM, Flavio Pompermaier pomperma...@okkam.it
wrote:

 Thanks for the quick reply soumya. Unfortunately I'm a newbie with
 Spark..what do you mean? is there any reference to how to do that?


 On Thu, Jun 19, 2014 at 12:24 AM, Soumya Simanta soumya.sima...@gmail.com
  wrote:


 You can add a back pressured enabled component in front that feeds data
 into Spark. This component can control in input rate to spark.

  On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier pomperma...@okkam.it
 wrote:
 
  Hi to all,
  in my use case I'd like to receive events and call an external service
 as they pass through. Is it possible to limit the number of contemporaneous
 call to that service (to avoid DoS) using Spark streaming? if so, limiting
 the rate implies a possible buffer growth...how can I control the buffer of
 incoming events waiting to be processed?
 
  Best,
  Flavio




Re: Execution stalls in LogisticRegressionWithSGD

2014-06-18 Thread Xiangrui Meng
Hi Bharath,

This is related to SPARK-1112, which we already found the root cause.
I will let you know when this is fixed.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com wrote:
 Couple more points:
 1)The inexplicable stalling of execution with large feature sets appears
 similar to that reported with the news-20 dataset:
 http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E

 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer,
 Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is
 unrelated to mllib.

 Thanks,
 Bharath



 On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Hi  Xiangrui ,

 I'm using 1.0.0.

 Thanks,
 Bharath

 On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 Thanks for posting the details! Which Spark version are you using?

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Hi,
 
  (Apologies for the long mail, but it's necessary to provide sufficient
  details considering the number of issues faced.)
 
  I'm running into issues testing LogisticRegressionWithSGD a two node
  cluster
  (each node with 24 cores and 16G available to slaves out of 24G on the
  system). Here's a description of the application:
 
  The model is being trained based on categorical features x, y, and
  (x,y).
  The categorical features are mapped to binary features by converting
  each
  distinct value in the category enum into a binary feature by itself
  (i.e
  presence of that value in a record implies corresponding feature = 1,
  else
  feature = 0. So, there'd be as many distinct features as enum values) .
  The
  training vector is laid out as
  [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
  training data has only one combination (Xk,Yk) and a label appearing in
  the
  record. Thus, the corresponding labeledpoint sparse vector would only
  have 3
  values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
  (though parse) would be nearly 614000.  The number of records is about
  1.33
  million. The records have been coalesced into 20 partitions across two
  nodes. The input data has not been cached.
  (NOTE: I do realize the records  features may seem large for a two
  node
  setup, but given the memory  cpu, and the fact that I'm willing to
  give up
  some turnaround time, I don't see why tasks should inexplicably fail)
 
  Additional parameters include:
 
  spark.executor.memory = 14G
  spark.default.parallelism = 1
  spark.cores.max=20
  spark.storage.memoryFraction=0.8 //No cache space required
  (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
  help
  either)
 
  The model training was initialized as : new
  LogisticRegressionWithSGD(1,
  maxIterations, 0.0, 0.05)
 
  However, after 4 iterations of gradient descent, the entire execution
  appeared to stall inexplicably. The corresponding executor details and
  details of the stalled stage (number 14) are as follows:
 
  MetricMin25th Median75th
  Max
  Result serialization time12 ms13 ms14 ms16 ms18 ms
  Duration4 s4 s5 s5 s
  5 s
  Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
  results
  Scheduler delay6 s6 s6 s6 s
  12 s
 
 
  Stage Id
  14 aggregate at GradientDescent.scala:178
 
  Task IndexTask IDStatusLocality Level Executor
  Launch TimeDurationGC Result Ser TimeErrors
 
  Time
 
  0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 14 ms
  7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 14 ms
  10 610 SUCCESS PROCESS_LOCAL
  serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  11 611 SUCCESS PROCESS_LOCAL  

Re: Execution stalls in LogisticRegressionWithSGD

2014-06-18 Thread Bharath Ravi Kumar
Thanks. I'll await the fix to re-run my test.


On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 This is related to SPARK-1112, which we already found the root cause.
 I will let you know when this is fixed.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Couple more points:
  1)The inexplicable stalling of execution with large feature sets appears
  similar to that reported with the news-20 dataset:
 
 http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E
 
  2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer,
  Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer
 is
  unrelated to mllib.
 
  Thanks,
  Bharath
 
 
 
  On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com
 
  wrote:
 
  Hi  Xiangrui ,
 
  I'm using 1.0.0.
 
  Thanks,
  Bharath
 
  On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote:
 
  Hi Bharath,
 
  Thanks for posting the details! Which Spark version are you using?
 
  Best,
  Xiangrui
 
  On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar 
 reachb...@gmail.com
  wrote:
   Hi,
  
   (Apologies for the long mail, but it's necessary to provide
 sufficient
   details considering the number of issues faced.)
  
   I'm running into issues testing LogisticRegressionWithSGD a two node
   cluster
   (each node with 24 cores and 16G available to slaves out of 24G on
 the
   system). Here's a description of the application:
  
   The model is being trained based on categorical features x, y, and
   (x,y).
   The categorical features are mapped to binary features by converting
   each
   distinct value in the category enum into a binary feature by itself
   (i.e
   presence of that value in a record implies corresponding feature = 1,
   else
   feature = 0. So, there'd be as many distinct features as enum
 values) .
   The
   training vector is laid out as
   [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in
 the
   training data has only one combination (Xk,Yk) and a label appearing
 in
   the
   record. Thus, the corresponding labeledpoint sparse vector would only
   have 3
   values Xk, Yk, (Xk,Yk) set for a record. The total length of the
 vector
   (though parse) would be nearly 614000.  The number of records is
 about
   1.33
   million. The records have been coalesced into 20 partitions across
 two
   nodes. The input data has not been cached.
   (NOTE: I do realize the records  features may seem large for a two
   node
   setup, but given the memory  cpu, and the fact that I'm willing to
   give up
   some turnaround time, I don't see why tasks should inexplicably fail)
  
   Additional parameters include:
  
   spark.executor.memory = 14G
   spark.default.parallelism = 1
   spark.cores.max=20
   spark.storage.memoryFraction=0.8 //No cache space required
   (Trying to set spark.akka.frameSize to a larger number, say, 20
 didn't
   help
   either)
  
   The model training was initialized as : new
   LogisticRegressionWithSGD(1,
   maxIterations, 0.0, 0.05)
  
   However, after 4 iterations of gradient descent, the entire execution
   appeared to stall inexplicably. The corresponding executor details
 and
   details of the stalled stage (number 14) are as follows:
  
   MetricMin25th Median75th
   Max
   Result serialization time12 ms13 ms14 ms16 ms18
 ms
   Duration4 s4 s5 s5 s
   5 s
   Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
   results
   Scheduler delay6 s6 s6 s6 s
   12 s
  
  
   Stage Id
   14 aggregate at GradientDescent.scala:178
  
   Task IndexTask IDStatusLocality Level Executor
   Launch TimeDurationGC Result Ser Time
  Errors
  
   Time
  
   0 600 RUNNING PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   1 601 RUNNING PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   2 602 RUNNING PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   3 603 RUNNING PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   4 604 RUNNING PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 1.1 h
   5 605 SUCCESS PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 4 s 2 s 12 ms
   6 606 SUCCESS PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 4 s 1 s 14 ms
   7 607 SUCCESS PROCESS_LOCAL
 casual.dataone.foo.bar.com
   2014/06/17 10:32:27 4 s 2 s 12 ms
   8 608 SUCCESS PROCESS_LOCAL
 serious.dataone.foo.bar.com
   2014/06/17 10:32:27 5 s 1 s 15 ms
   9 609 SUCCESS 

Fwd: BSP realization on Spark

2014-06-18 Thread Ghousia
-- Forwarded message --
From: Ghousia ghousia.ath...@gmail.com
Date: Wed, Jun 18, 2014 at 5:41 PM
Subject: BSP realization on Spark
To: user@spark.apache.org


Hi,

We are trying to implement a BSP model in Spark with the help of GraphX.
One thing I encountered is a Pregel operator in Graph class. But what I
fail to understand is how the Master and Worker needs to be assigned (BSP),
and how barrier synchronization would happen. The pregel operator provides
a way to define a vertex program, but nothing is mentioned about the
barrier synchronization.

Any help in this regard is truly appreciated.

Many Thanks,
Ghousia.


options set in spark-env.sh is not reflecting on actual execution

2014-06-18 Thread MEETHU MATHEW
Hi all,

I have a doubt regarding the options in spark-env.sh. I set the following 
values in the file in master and 2 workers

SPARK_WORKER_MEMORY=7g
SPARK_EXECUTOR_MEMORY=6g
SPARK_DAEMON_JAVA_OPTS+=- Dspark.akka.timeout=30 
-Dspark.akka.frameSize=1 -Dspark.blockManagerHeartBeatMs=80 
-Dspark.shuffle.spill=false

But SPARK_EXECUTOR_MEMORY is showing 4g in web UI.Do I need to change it 
anywhere else to make it 4g and to reflect it in web UI.

A warning is coming that blockManagerHeartBeatMs is exceeding 45 while 
executing a process even though I set it to 80.

So I doubt whether it should be set  as SPARK_MASTER_OPTS or SPARK_WORKER_OPTS..
 
Thanks  Regards, 
Meethu M

Re: Best practices for removing lineage of a RDD or Graph object?

2014-06-18 Thread dash
Hi Roy, 

Thanks for your help, I write a small code snippet that could reproduce the 
problem.
Could you help me read through it and see if I did anything wrong?

Thanks!

  def main(args: Array[String]) {
val conf = new SparkConf().setAppName(“TEST)
  .setMaster(local[4])
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
  .set(spark.kryo.registrator, edu.nd.dsg.hdtm.util.HDTMKryoRegistrator)
val sc = new SparkContext(conf)

val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), 
Edge(2L, 0L, 2L)))
val newGraph = Graph(v, e)
var currentGraph = newGraph
val vertexIds = currentGraph.vertices.map(_._1).collect()

for (i - 1 to 1000) {
  var g = currentGraph
  vertexIds.toStream.foreach(id = {
g = Graph(currentGraph.vertices, currentGraph.edges)
g.cache()
g.edges.cache()
g.vertices.cache()
g.vertices.count()
g.edges.count()
  })

  currentGraph.unpersistVertices(blocking =  false)
  currentGraph.edges.unpersist(blocking = false)
  currentGraph = g
  println( iter +i+ finished)
}

  }


Baoxu Shi(Dash)
Computer Science and Engineering Department
University of Notre Dame
b...@nd.edu



 On Jun 19, 2014, at 1:47 AM, roy20021 [via Apache Spark User List] 
 ml-node+s1001560n7892...@n3.nabble.com wrote:
 
 No sure if it can help, btw:
 Checkpoint cuts the lineage. The checkpoint method is a flag. In order to 
 actually perform the checkpoint you must do NOT materialise the RDD before it 
 has been flagged otherwise the flag is just ignored.
 
 rdd2 = rdd1.map(..)
 rdd2.checkpoint()
 rdd2.count
 rdd2.isCheckpointed // true
 
 Il mercoledì 18 giugno 2014, dash [hidden email] ha scritto:
  If a RDD object have non-empty .dependencies, does that means it have
  lineage? How could I remove it?
 
  I'm doing iterative computing and each iteration depends on the result
  computed in previous iteration. After several iteration, it will throw
  StackOverflowError.
 
  At first I'm trying to use cache, I read the code in pregel.scala, which is
  part of GraphX, they use a count method to materialize the object after
  cache, but I attached a debugger and seems such approach does not empty
  .dependencies, and that also does not work in my code.
 
  Another alternative approach is using checkpoint, I tried checkpoint
  vertices and edges for my Graph object and then materialize it by count
  vertices and edges. Then I use .isCheckpointed to check if it is correctly
  checkpointed, but it always return false.
 
 
 
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
  
 
 If you reply to this email, your message will be added to the discussion 
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779p7892.html
 To unsubscribe from Best practices for removing lineage of a RDD or Graph 
 object?, click here.
 NAML





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779p7893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.