Re: Rule Engine for Spark

2015-11-04 Thread Daniel Mahler
I am not familiar with any rule engines on Spark Streaming or even plain
Spark
Conceptually closest things I am aware of are Datomic and Bloom-lang.
Neither of them are Spark based but they implement Datalog like languages
over distributed stores.

   - http://www.datomic.com/
   - http://bloom-lang.net/

There is somewhat of a mismatch between streaming data and and rule based
systems since the preconditions of a rule can be satisfied by data that is
far apart in the stream.
This is further compounded by the fact that rules can chain arbitrarily,
potentially recursively.
Traditionally practical rule based systems rely heavily on indexing and and
agenda mechanisms like RETE, TREAT and LEAPS:

   - http://www.cs.utexas.edu/ftp/predator/tr-94-28.pdf
   - https://en.wikipedia.org/wiki/Rete_algorithm
   - http://www.cs.utexas.edu/~miranker/treator.htm

This entails keeping track of the data you have seen in the past.

I have not worked in this area for some time though and do not know if
there has been recent progress on this.

cheers
Daniel

On Wed, Nov 4, 2015 at 6:44 PM, Cheng, Hao  wrote:

> Or try Streaming SQL? Which is a simple layer on top of the Spark
> Streaming. J
>
>
>
> https://github.com/Intel-bigdata/spark-streamingsql
>
>
>
>
>
> *From:* Cassa L [mailto:lcas...@gmail.com]
> *Sent:* Thursday, November 5, 2015 8:09 AM
> *To:* Adrian Tanase
> *Cc:* Stefano Baghino; user
> *Subject:* Re: Rule Engine for Spark
>
>
>
> Thanks for reply. How about DROOLs. Does it worj with Spark?
>
> LCassa
>
>
>
> On Wed, Nov 4, 2015 at 3:02 AM, Adrian Tanase  wrote:
>
> Another way to do it is to extract your filters as SQL code and load it in
> a transform – which allows you to change the filters at runtime.
>
>
>
> Inside the transform you could apply the filters by goind RDD -> DF -> SQL
> -> RDD.
>
>
>
> Lastly, depending on how complex your filters are, you could skip SQL and
> create your own mini-DSL that runs inside transform. I’d definitely start
> here if the filter predicates are simple enough…
>
>
>
> -adrian
>
>
>
> *From: *Stefano Baghino
> *Date: *Wednesday, November 4, 2015 at 10:15 AM
> *To: *Cassa L
> *Cc: *user
> *Subject: *Re: Rule Engine for Spark
>
>
>
> Hi LCassa,
>
> unfortunately I don't have actual experience on this matter, however for a
> similar use case I have briefly evaluated Decision
>  (then called literally Streaming
> CEP Engine) and it looked interesting. I hope it may help.
>
>
>
> On Wed, Nov 4, 2015 at 1:42 AM, Cassa L  wrote:
>
> Hi,
>
>  Has anyone used rule engine with spark streaming? I have a case where
> data is streaming from Kafka and I need to apply some rules on it (instead
> of hard coding in a code).
>
> Thanks,
>
> LCassa
>
>
>
>
>
> --
>
> BR,
>
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>
>
>


Error using json4s with Apache Spark in spark-shell

2015-06-11 Thread Daniel Mahler
This is a cross post of a StackOverflow that has not been answered.
I have just started 50pt bounty on it there.
http://stackoverflow.com/questions/30744294/error-using-json4s-with-apache-spark-in-spark-shell
You can answer it there if you prefer


I am trying to use the case class extraction feature of json4s in Spark,
ie calling `jvalue.extract[MyCaseClass]`. It works fine if I bring the
`JValue` objects into the master and do the extraction there, but the same
calls fail in the workers:

*import org.json4s._*
*import org.json4s.jackson.JsonMethods._*
*import scala.util.{Try, Success, Failure}*

*val sqx = sqlContext*

*val data = sc.textFile(inpath).coalesce(2000)*

*case class PageView(*
* client:  Option[String]*
*)*

*def extract(json: JValue) = {*
*  implicit def formats = org.json4s.DefaultFormats*
*  Try(json.extract[PageView]).toOption*
*}*

*val json = data.map(parse(_)).sample(false, 1e-6).cache()*

*// count initial inputs*
*val raw = json.count *


*// count successful extractions locally -- same value as above*
*val loc = json.toLocalIterator.flatMap(extract).size*

*// distributed count -- always zero*
*val dist = json.flatMap(extract).count // always returns zero*

*// this throws  org.json4s.package$MappingException: Parsed JSON
values do not match with class constructor*
*json.map(x = {implicit def formats = org.json4s.DefaultFormats;
x.extract[PageView]}).count*

The implicit for `Formats` is defined locally in the `extract` function
since DefaultFormats is not serializable and defining it at top level
caused it to be serialized to for transmission to the workers rather than
constructed there. I think the proble still has something to do with the
remote initialization of `DefaultFormats`, but I am not sure what it is.

When I call the `extract` method directly, instead of my `extract`
function, like in the last example, it no longer complains about
serialization but just throws an error that the JSON does not match the
expected structure.

How can I get the extraction to work when distributed to the workers?

Wesley Miao has reproduced the problem and found that it is specific to
spark-shell. He reports that this code works as a standalone application.

thanks
Daniel Mahler


























I am trying to use the case class extraction feature of json4s in Spark, ie
calling jvalue.extract[MyCaseClass]. It works fine if I bring the JValue
objects into the master and do the extraction there, but the same calls
fail in the workers:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}

val sqx = sqlContext

val data = sc.textFile(inpath).coalesce(2000)

case class PageView(
 client:  Option[String]
)

def extract(json: JValue) = {
  implicit def formats = org.json4s.DefaultFormats
  Try(json.extract[PageView]).toOption
}

val json = data.map(parse(_)).sample(false, 1e-6).cache()

// count initial inputs
val raw = json.count


// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size

// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero

// this throws  org.json4s.package$MappingException: Parsed JSON values do
not match with class constructor
json.map(x = {implicit def formats = org.json4s.DefaultFormats;
x.extract[PageView]}).count

The implicit for Formats is defined locally in the extract function since
DefaultFormats is not serializable and defining it at top level caused it
to be serialized to for transmission to the workers rather than constructed
there. I think the proble still has something to do with the remote
initialization of DefaultFormats, but I am not sure what it is.

When I call the extract method directly, insted of my extract function,
like in the last example, it no longer complains about serialization but
just throws an error that the JSON does not match the expected structure.

How can I get the extraction to work when distributed to the workers?

@WesleyMiao has reproduced the problem and found that it is specific to
spark-shell. He reports that this code works as a standalone application.


tachyon on machines launched with spark-ec2 scripts

2015-04-24 Thread Daniel Mahler
I have a cluster launched with spark-ec2.
I can see a TachyonMaster process running,
but I do not seem to be able to use tachyon from the spark-shell.

if I try

rdd.saveAsTextFile(tachyon://localhost:19998/path)
I get

15/04/24 19:18:31 INFO TaskSetManager: Starting task 12.2 in stage 1.0 (TID
216, ip-10-63-69-48.ec2.internal, PROCESS_LOCAL, 1383 bytes)
15/04/24 19:18:31 WARN TaskSetManager: Lost task 32.2 in stage 1.0 (TID
177, ip-10-63-69-48.ec2.internal): java.io.IOException: Failed to connect
to master localhost/127.0.0.1:19998 after 5 attempts
at tachyon.client.TachyonFS.connect(TachyonFS.java:293)
at tachyon.client.TachyonFS.getUnderfsAddress(TachyonFS.java:1224)
at tachyon.hadoop.TFS.initialize(TFS.java:289)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
at
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:83)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at java.lang.Thread.run(Thread.java:745)
Caused by: tachyon.org.apache.thrift.TException: Failed to connect to
master localhost/127.0.0.1:19998 after 5 attempts
at tachyon.master.MasterClient.connect(MasterClient.java:178)
at tachyon.client.TachyonFS.connect(TachyonFS.java:290)
... 17 more
Caused by: tachyon.org.apache.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
at
tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:185)
at
tachyon.org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
at tachyon.master.MasterClient.connect(MasterClient.java:156)
... 18 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at
tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:180)
... 20 more


 What do I need to do before I can use tachyon?

thanks
Daniel


Re: problem writing to s3

2015-04-23 Thread Daniel Mahler
Hi Akhil

I can confirm that the problem goes away when jsonRaw and jsonClean are in
different s3 buckets.

thanks
Daniel

On Thu, Apr 23, 2015 at 1:27 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you try writing to a different S3 bucket and confirm that?

 Thanks
 Best Regards

 On Thu, Apr 23, 2015 at 12:11 AM, Daniel Mahler dmah...@gmail.com wrote:

 Hi Akhil,

 It works fine when outprefix is a hdfs:///localhost/... url.

 It looks to me as if there is something about spark writing to the same
 s3 bucket it is reading from.

 That is the only real difference between the 2 saveAsTextFile whet
 outprefix is on s3,
 inpath is also on s3 but in a different bucket, but jsonRaw and jsonClean
 are distinct directories in the same bucket.
 I do know know why that should be a problem though.

 I will rerun using s3 paths and send the log information.

 thanks
 Daniel

 thanks
 Daniel

 On Wed, Apr 22, 2015 at 1:45 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you look in your worker logs and see whats happening in there? Are
 you able to write the same to your HDFS?

 Thanks
 Best Regards

 On Wed, Apr 22, 2015 at 4:45 AM, Daniel Mahler dmah...@gmail.com
 wrote:

 I am having a strange problem writing to s3 that I have distilled to
 this minimal example:

 def jsonRaw = s${outprefix}-json-raw
 def jsonClean = s${outprefix}-json-clean

 val txt = sc.textFile(inpath)//.coalesce(shards, false)
 txt.count

 val res = txt.saveAsTextFile(jsonRaw)

 val txt2 = sc.textFile(jsonRaw +/part-*)
 txt2.count

 txt2.saveAsTextFile(jsonClean)

 This code should simply copy files from inpath to jsonRaw and then from
 jsonRaw to jsonClean.
 This code executes all the way down to the last line where it hangs
 after creating the output directory contatining a _temporary_$folder but no
 actual files not even temporary ones.

 `outputprefix` is and  bucket url, both jsonRaw and jsonClean are in
 the same bucket.
 Both calls .count succeed and return the same number. This means Spark
 can read from inpath and can both read from and write to jsonRaw. Since
 jsonClean is in the same bucket as jsonRaw and the final line does create
 the directory, I cannot think of any reason why the files should  not be
 written. If there were any access or url problems they should already
 manifest when writing jsonRaw.

 This problem is completely reproduceable with Spark 1.2.1 and 1.3.1
 The console output from the last line is

 scala txt0.saveAsTextFile(jsonClean)
 15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3
 15/04/21 22:55:48 INFO storage.BlockManager: Removing block
 broadcast_3_piece0
 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of
 size 2024 dropped from memory (free 278251716)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-51-181-81.ec2.internal:45199 in memory (size:
 2024.0 B, free: 265.4 MB)
 15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of
 block broadcast_3_piece0
 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3
 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size
 2728 dropped from memory (free 27825)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-166-129-153.ec2.internal:46671 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-51-153-34.ec2.internal:51691 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-158-142-155.ec2.internal:54690 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-61-144-7.ec2.internal:44849 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-69-77-180.ec2.internal:42417 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3
 15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile
 at console:38
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2
 (saveAsTextFile at console:38) with 96 output partitions
 (allowLocal=false)
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage
 2(saveAsTextFile at console:38)
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage:
 List()
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List()
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2
 (MapPartitionsRDD[5] at saveAsTextFile at console:38), which has no
 missing parents
 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(22248)
 called with curMem=48112, maxMem=278302556
 15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4 stored as
 values in memory (estimated size 21.7 KB, free 265.3 MB)
 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(17352

spark-ec2 s3a filesystem support and hadoop versions

2015-04-22 Thread Daniel Mahler
I would like to easily launch a cluster that supports s3a file systems.

if I launch a cluster with `spark-ec2 --hadoop-major-version=2`,
what determines the minor version of hadoop?

Does it depend on the spark version being launched?

Are there other allowed values for --hadoop-major-version besides 1 and 2?

How can I get a cluster that supports s3a fielsystems?

thanks
Daniel


problem writing to s3

2015-04-21 Thread Daniel Mahler
I am having a strange problem writing to s3 that I have distilled to this
minimal example:

def jsonRaw = s${outprefix}-json-raw
def jsonClean = s${outprefix}-json-clean

val txt = sc.textFile(inpath)//.coalesce(shards, false)
txt.count

val res = txt.saveAsTextFile(jsonRaw)

val txt2 = sc.textFile(jsonRaw +/part-*)
txt2.count

txt2.saveAsTextFile(jsonClean)

This code should simply copy files from inpath to jsonRaw and then from
jsonRaw to jsonClean.
This code executes all the way down to the last line where it hangs after
creating the output directory contatining a _temporary_$folder but no
actual files not even temporary ones.

`outputprefix` is and  bucket url, both jsonRaw and jsonClean are in the
same bucket.
Both calls .count succeed and return the same number. This means Spark can
read from inpath and can both read from and write to jsonRaw. Since
jsonClean is in the same bucket as jsonRaw and the final line does create
the directory, I cannot think of any reason why the files should  not be
written. If there were any access or url problems they should already
manifest when writing jsonRaw.

This problem is completely reproduceable with Spark 1.2.1 and 1.3.1
The console output from the last line is

scala txt0.saveAsTextFile(jsonClean)
15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3
15/04/21 22:55:48 INFO storage.BlockManager: Removing block
broadcast_3_piece0
15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of
size 2024 dropped from memory (free 278251716)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-51-181-81.ec2.internal:45199 in memory (size: 2024.0 B, free:
265.4 MB)
15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of block
broadcast_3_piece0
15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3
15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size 2728
dropped from memory (free 27825)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-166-129-153.ec2.internal:46671 in memory (size: 2024.0 B, free:
13.8 GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-51-153-34.ec2.internal:51691 in memory (size: 2024.0 B, free: 13.8
GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-158-142-155.ec2.internal:54690 in memory (size: 2024.0 B, free:
13.8 GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-61-144-7.ec2.internal:44849 in memory (size: 2024.0 B, free: 13.8
GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-69-77-180.ec2.internal:42417 in memory (size: 2024.0 B, free: 13.8
GB)
15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3
15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile at
console:38
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2 (saveAsTextFile at
console:38) with 96 output partitions (allowLocal=false)
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage
2(saveAsTextFile at console:38)
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage:
List()
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2
(MapPartitionsRDD[5] at saveAsTextFile at console:38), which has no
missing parents
15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(22248) called
with curMem=48112, maxMem=278302556
15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4 stored as
values in memory (estimated size 21.7 KB, free 265.3 MB)
15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(17352) called
with curMem=70360, maxMem=278302556
15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4_piece0 stored
as bytes in memory (estimated size 16.9 KB, free 265.3 MB)
15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added broadcast_4_piece0
in memory on ip-10-51-181-81.ec2.internal:45199 (size: 16.9 KB, free: 265.4
MB)
15/04/21 22:55:49 INFO storage.BlockManagerMaster: Updated info of block
broadcast_4_piece0
15/04/21 22:55:49 INFO spark.SparkContext: Created broadcast 4 from
broadcast at DAGScheduler.scala:839
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting 96 missing tasks
from Stage 2 (MapPartitionsRDD[5] at saveAsTextFile at console:38)
15/04/21 22:55:49 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
with 96 tasks
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
2.0 (TID 192, ip-10-166-129-153.ec2.internal, PROCESS_LOCAL, 1377 bytes)
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
2.0 (TID 193, ip-10-61-144-7.ec2.internal, PROCESS_LOCAL, 1377 bytes)
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
2.0 (TID 194, ip-10-158-142-155.ec2.internal, PROCESS_LOCAL, 1377 bytes)
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 3.0 

HiveContext vs SQLContext

2015-04-20 Thread Daniel Mahler
Is HiveContext still preferred over SQLContext?
What are the current (1.3.1) diferences between them?

thanks
Daniel


Re: Problem getting program to run on 15TB input

2015-04-13 Thread Daniel Mahler
Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Everything works smoothly if I do the 99%-removal filter in Hive first.
 So, all the baggage from garbage collection was breaking it.

 Is there a way to filter() out 99% of the data without having to garbage
 collect 99% of the RDD?

 On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:

 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)

 Here is a typical GC log trace from one of the yarn container logs:

 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]
 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]
 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]
 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]
 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]
 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]

 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?

 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):

 --num-executors 150 \
 --driver-memory 15g \
 --executor-memory 110g \
 --executor-cores 32 \

 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space




 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation
 (repartition in this case) that it is performing. Only if this information
 is wrong (just a possibility), could it have started groupByKey already.

 I will try to analyze the amount of skew in the data by using
 reduceByKey (or simply countByKey) which is relatively inexpensive. For the
 purposes of this algorithm I can simply log and remove keys with huge
 counts, before doing groupByKey.

 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes
 timeout trying to connect because of a long stop-the-world), quite possibly
 due to groupByKey. groupByKey is a very expensive operation as it may bring
 all the data for a particular partition into memory (in particular, it
 cannot spill values for a single key, so if you have a single very skewed
 key you can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend:
 Driver Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task
 421027.0 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 

Cleaning/transforming json befor converting to SchemaRDD

2014-11-03 Thread Daniel Mahler
I am trying to convert terabytes of json log files into parquet files.
but I need to clean it a little first.
I end up doing the following

txt = sc.textFile(inpath).coalesce(800)

val json = (for {
 line - txt
 JObject(child) = parse(line)
 child2 = (for {
   JField(name, value) - child
   _ - patt(name) // filter fields with invalid names
 } yield JField(name.toLowerCase, value))
} yield compact(render(JObject(child2

sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)

And glaring inefficiency is that after parsing and cleaning the data i
reserialize it
by calling compact(render(JObject(child2 only to pass the text
to jsonRDD to be parsed agian. However I see no way  to turn an RDD of
json4s objects directly into a SchemRDD without turning it back into text
first

Is there any way to do this?

I am also open to other suggestions for speeding up the above code,
it is very slow in its current form.

I would also like to make jsonFile drop invalid json records rather than
failing the entire job. Is that possible?

thanks
Daniel


union of SchemaRDDs

2014-11-01 Thread Daniel Mahler
I would like to combine 2 parquet tables I have create.
I tried:

  sc.union(sqx.parquetFile(fileA), sqx.parquetFile(fileB))

but that just returns RDD[Row].
How do I combine them to get a SchemaRDD[Row]?

thanks
Daniel


Re: union of SchemaRDDs

2014-11-01 Thread Daniel Mahler
Thanks Matei. What does unionAll do if the input RDD schemas are not 100%
compatible. Does it take the union of the columns and generalize the types?

thanks
Daniel

On Sat, Nov 1, 2014 at 6:08 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Try unionAll, which is a special method on SchemaRDDs that keeps the
 schema on the results.

 Matei

  On Nov 1, 2014, at 3:57 PM, Daniel Mahler dmah...@gmail.com wrote:
 
  I would like to combine 2 parquet tables I have create.
  I tried:
 
sc.union(sqx.parquetFile(fileA), sqx.parquetFile(fileB))
 
  but that just returns RDD[Row].
  How do I combine them to get a SchemaRDD[Row]?
 
  thanks
  Daniel




Re: use additional ebs volumes for hsdf storage with spark-ec2

2014-10-30 Thread Daniel Mahler
Thanks Akhil. I tried changing /root/ephemeral-hdfs/conf/hdfs-site.xml to
have

  property
namedfs.data.dir/name

value/vol,/vol0,/vol1,/vol2,/vol3,/vol4,/vol5,/vol6,/vol7,/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data/value
  /property

and then running

/root/ephemeral-hdfs/bin/stop-all.sh
copy-dir  /root/ephemeral-hdfs/conf/
/root/ephemeral-hdfs/bin/start-all.sh

to try and make sure the new configurations taks on the entire cluster.
I then ran spark to write to the local hdfs.
It failed after filling the original /mnt* mounted drives,,
without writing anything to the attached /vol* drives.

I also tried completely stopping and restarting the cluster,
but restarting resets /root/ephemeral-hdfs/conf/hdfs-site.xml to the
default state.

thanks
Daniel



On Thu, Oct 30, 2014 at 1:56 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I think you can check in the core-site.xml or hdfs-site.xml file under
 /root/ephemeral-hdfs/etc/hadoop/ where you can see data node dir property
 which will be a comma separated list of volumes.

 Thanks
 Best Regards

 On Thu, Oct 30, 2014 at 5:21 AM, Daniel Mahler dmah...@gmail.com wrote:

 I started my ec2 spark cluster with

 ./ec2/spark---ebs-vol-{size=100,num=8,type=gp2} -t m3.xlarge -s 10
 launch mycluster

 I see the additional volumes attached but they do not seem to be set up
 for hdfs.
 How can I check if they are being utilized on all workers,
 and how can I get all workers to utilize the extra volumes for hdfs.
 I do not have experience using hadoop directly, only through spark.

 thanks
 Daniel





use additional ebs volumes for hsdf storage with spark-ec2

2014-10-29 Thread Daniel Mahler
I started my ec2 spark cluster with

./ec2/spark---ebs-vol-{size=100,num=8,type=gp2} -t m3.xlarge -s 10
launch mycluster

I see the additional volumes attached but they do not seem to be set up for
hdfs.
How can I check if they are being utilized on all workers,
and how can I get all workers to utilize the extra volumes for hdfs.
I do not have experience using hadoop directly, only through spark.

thanks
Daniel


Fwd: Saving very large data sets as Parquet on S3

2014-10-24 Thread Daniel Mahler
I am trying to convert some json logs to Parquet and save them on S3.
In principle this is just

import org.apache.spark._
val sqlContext = new sql.SQLContext(sc)
val data = sqlContext.jsonFile(s3n://source/path/*/*,10e-8)
data.registerAsTable(data)
data.saveAsParquetFile(s3n://target/path)

This works fine for up to about a 10^9 records, but above that I start
having problems.
The first problem I encountered is that after the data file get written out
writing the Parquet summary file fails.
While I seem to have all the data saved out,
programs have a huge have a huge start up time
when processing parquet files without a summary file.

Writing  the summary file appears to primarily depend
on on the number of partitions being written,
and relatively independent of the amount of being written.
Problems start after about a 1000 partitions,
writing 1 partitions fails even with repartitioned one days worth of
data.

My data is very finely partitioned, about 16 log files per hour, or 13K
files per month.
The file sizes are very uneven, ranging over several orders of magnitude.
There are several years of data.
By my calculations this will produce 10s of terabytes of Parquet files.

The first thing I tried to get around this problem
 was  passing the data through `coalesce(1000, shuffle=false)` before
writing.
This works up to about a month worth of data,
after that coalescing to 1000 partitions produces parquet files larger than
5G
and writing to S3 fails as a result.
Also coalescing slows processing down by at least a factor of 2.
I do not understand why this should happen since I use shuffle=false.
AFAIK coalesce should just be a bookkeeping trick and the original
partitions should be processed pretty much the same as before, just with
their outputs concatenated.

The only other option I can think of is to write each month coalesced
as a separate data set with its own summary file
and union the RDDs when processing the data,
but I do not know how much overhead that will introduce.

I am looking for advice on the best way to save this size data in Parquet
on S3.
Apart from solving the the summary file issue i am also looking for ways to
improve performance.
Would it make sense to write the data to a local hdfs first and push it to
S3 with `hadoop distcp`?
Is putting Tachyon in front of either the input or the output S3 likely to
help?
If yes which is likely to help more?

I set options on the master as follows

+
cat EOF ~/spark/conf/spark-defaults.conf
spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.rdd.compress  true
spark.shuffle.consolidateFiles  true
spark.akka.frameSize  20
EOF

copy-dir /root/spark/conf
spark/sbin/stop-all.sh
sleep 5
spark/sbin/start-all.
++

Does this make sense? Should I set some other options?
I have also asked these questions on StackOverflow where I reproduced the
full error messages:

+
http://stackoverflow.com/questions/26332542/how-to-save-a-multi-terabyte-schemardd-in-parquet-format-on-s3
+
http://stackoverflow.com/questions/26321947/multipart-uploads-to-amazon-s3-from-apache-spark
+
http://stackoverflow.com/questions/26291165/spark-sql-unable-to-complete-writing-parquet-data-with-a-large-number-of-shards

thanks
Daniel


Saving very large data sets as Parquet on S3

2014-10-20 Thread Daniel Mahler
I am trying to convert some json logs to Parquet and save them on S3.
In principle this is just

import org.apache.spark._
val sqlContext = new sql.SQLContext(sc)
val data = sqlContext.jsonFile(s3n://source/path/*/*,10e-8)
data.registerAsTable(data)
data.saveAsParquetFile(s3n://target/path)

This works fine for up to about a 10^9 records, but above that I start
having problems.
The first problem I encountered is that after the data file get written out
writing the Parquet summary file fails.
While I seem to have all the data saved out,
programs have a huge have a huge start up time
when processing parquet files without a summary file.

Writing  the summary file appears to primarily depend
on on the number of partitions being written,
and relatively independent of the amount of being written.
Problems start after about a 1000 partitions,
writing 1 partitions fails even with repartitioned one days worth of
data.

My data is very finely partitioned, about 16 log files per hour, or 13K
files per month.
The file sizes are very uneven, ranging over several orders of magnitude.
There are several years of data.
By my calculations this will produce 10s of terabytes of Parquet files.

The first thing I tried to get around this problem
 was  passing the data through `coalesce(1000, shuffle=false)` before
writing.
This works up to about a month worth of data,
after that coalescing to 1000 partitions produces parquet files larger than
5G
and writing to S3 fails as a result.
Also coalescing slows processing down by at least a factor of 2.
I do not understand why this should happen since I use shuffle=false.
AFAIK coalesce should just be a bookkeeping trick and the original
partitions should be processed pretty much the same as before, just with
their outputs concatenated.

The only other option I can think of is to write each month coalesced
as a separate data set with its own summary file
and union the RDDs when processing the data,
but I do not know how much overhead that will introduce.

I am looking for advice on the best way to save this size data in Parquet
on S3.
Apart from solving the the summary file issue i am also looking for ways to
improve performance.
Would it make sense to write the data to a local hdfs first and push it to
S3 with `hadoop distcp`?
Is putting Tachyon in front of either the input or the output S3 likely to
help?
If yes which is likely to help more?

I set options on the master as follows

+
cat EOF ~/spark/conf/spark-defaults.conf
spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.rdd.compress  true
spark.shuffle.consolidateFiles  true
spark.akka.frameSize  20
EOF

copy-dir /root/spark/conf
spark/sbin/stop-all.sh
sleep 5
spark/sbin/start-all.
++

Does this make sense? Should I set some other options?
I have also asked these questions on StackOverflow where I reproduced the
full error messages:

+
http://stackoverflow.com/questions/26332542/how-to-save-a-multi-terabyte-schemardd-in-parquet-format-on-s3
+
http://stackoverflow.com/questions/26321947/multipart-uploads-to-amazon-s3-from-apache-spark
+
http://stackoverflow.com/questions/26291165/spark-sql-unable-to-complete-writing-parquet-data-with-a-large-number-of-shards

thanks
Daniel


Getting spark to use more than 4 cores on Amazon EC2

2014-10-20 Thread Daniel Mahler
I am launching EC2 clusters using the spark-ec2 scripts.
My understanding is that this configures spark to use the available
resources.
I can see that spark will use the available memory on larger istance types.
However I have never seen spark running at more than 400% (using 100% on 4
cores)
on machines with many more cores.
Am I misunderstanding the docs? Is it just that high end ec2 instances get
I/O starved when running spark? It would be strange if that consistently
produced a 400% hard limit though.

thanks
Daniel


Re: Getting spark to use more than 4 cores on Amazon EC2

2014-10-20 Thread Daniel Mahler
I usually run interactively from the spark-shell.
My data definitely has more than enough partitions to keep all the workers
busy.
When I first launch the cluster I first do:

+
cat EOF ~/spark/conf/spark-defaults.conf
spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.rdd.compress  true
spark.shuffle.consolidateFiles  true
spark.akka.frameSize  20
EOF

copy-dir /root/spark/conf
spark/sbin/stop-all.sh
sleep 5
spark/sbin/start-all.sh
+

before starting the spark-shell or running any jobs.




On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Perhaps your RDD is not partitioned enough to utilize all the cores in
 your system.

 Could you post a simple code snippet and explain what kind of parallelism
 you are seeing for it? And can you report on how many partitions your RDDs
 have?

 On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com wrote:


 I am launching EC2 clusters using the spark-ec2 scripts.
 My understanding is that this configures spark to use the available
 resources.
 I can see that spark will use the available memory on larger istance
 types.
 However I have never seen spark running at more than 400% (using 100% on
 4 cores)
 on machines with many more cores.
 Am I misunderstanding the docs? Is it just that high end ec2 instances
 get I/O starved when running spark? It would be strange if that
 consistently produced a 400% hard limit though.

 thanks
 Daniel





Re: Getting spark to use more than 4 cores on Amazon EC2

2014-10-20 Thread Daniel Mahler
I launch the cluster using vanilla spark-ec2 scripts.
I just specify the number of slaves and instance type

On Mon, Oct 20, 2014 at 4:07 PM, Daniel Mahler dmah...@gmail.com wrote:

 I usually run interactively from the spark-shell.
 My data definitely has more than enough partitions to keep all the workers
 busy.
 When I first launch the cluster I first do:

 +
 cat EOF ~/spark/conf/spark-defaults.conf
 spark.serializerorg.apache.spark.serializer.KryoSerializer
 spark.rdd.compress  true
 spark.shuffle.consolidateFiles  true
 spark.akka.frameSize  20
 EOF

 copy-dir /root/spark/conf
 spark/sbin/stop-all.sh
 sleep 5
 spark/sbin/start-all.sh
 +

 before starting the spark-shell or running any jobs.




 On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Perhaps your RDD is not partitioned enough to utilize all the cores in
 your system.

 Could you post a simple code snippet and explain what kind of parallelism
 you are seeing for it? And can you report on how many partitions your RDDs
 have?

 On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com wrote:


 I am launching EC2 clusters using the spark-ec2 scripts.
 My understanding is that this configures spark to use the available
 resources.
 I can see that spark will use the available memory on larger istance
 types.
 However I have never seen spark running at more than 400% (using 100% on
 4 cores)
 on machines with many more cores.
 Am I misunderstanding the docs? Is it just that high end ec2 instances
 get I/O starved when running spark? It would be strange if that
 consistently produced a 400% hard limit though.

 thanks
 Daniel






Re: Getting spark to use more than 4 cores on Amazon EC2

2014-10-20 Thread Daniel Mahler
I am using globs though

raw = sc.textFile(/path/to/dir/*/*)

and I have tons of files so 1 file per partition should not be a problem.

On Mon, Oct 20, 2014 at 7:14 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 The biggest danger with gzipped files is this:

  raw = sc.textFile(/path/to/file.gz, 8) raw.getNumPartitions()1

 You think you’re telling Spark to parallelize the reads on the input, but
 Spark cannot parallelize reads against gzipped files. So 1 gzipped file
 gets assigned to 1 partition.

 It might be a nice user hint if Spark warned when parallelism is disabled
 by the input format.

 Nick
 ​

 On Mon, Oct 20, 2014 at 6:53 PM, Daniel Mahler dmah...@gmail.com wrote:

 Hi Nicholas,

 Gzipping is a an impressive guess! Yes, they are.
 My data sets are too large to make repartitioning viable, but I could try
 it on a subset.
 I generally have many more partitions than cores.
 This was happenning before I started setting those configs.

 thanks
 Daniel


 On Mon, Oct 20, 2014 at 5:37 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Are you dealing with gzipped files by any chance? Does explicitly
 repartitioning your RDD to match the number of cores in your cluster help
 at all? How about if you don't specify the configs you listed and just go
 with defaults all around?

 On Mon, Oct 20, 2014 at 5:22 PM, Daniel Mahler dmah...@gmail.com
 wrote:

 I launch the cluster using vanilla spark-ec2 scripts.
 I just specify the number of slaves and instance type

 On Mon, Oct 20, 2014 at 4:07 PM, Daniel Mahler dmah...@gmail.com
 wrote:

 I usually run interactively from the spark-shell.
 My data definitely has more than enough partitions to keep all the
 workers busy.
 When I first launch the cluster I first do:

 +
 cat EOF ~/spark/conf/spark-defaults.conf
 spark.serializerorg.apache.spark.serializer.KryoSerializer
 spark.rdd.compress  true
 spark.shuffle.consolidateFiles  true
 spark.akka.frameSize  20
 EOF

 copy-dir /root/spark/conf
 spark/sbin/stop-all.sh
 sleep 5
 spark/sbin/start-all.sh
 +

 before starting the spark-shell or running any jobs.




 On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Perhaps your RDD is not partitioned enough to utilize all the cores
 in your system.

 Could you post a simple code snippet and explain what kind of
 parallelism you are seeing for it? And can you report on how many
 partitions your RDDs have?

 On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com
 wrote:


 I am launching EC2 clusters using the spark-ec2 scripts.
 My understanding is that this configures spark to use the available
 resources.
 I can see that spark will use the available memory on larger istance
 types.
 However I have never seen spark running at more than 400% (using
 100% on 4 cores)
 on machines with many more cores.
 Am I misunderstanding the docs? Is it just that high end ec2
 instances get I/O starved when running spark? It would be strange if 
 that
 consistently produced a 400% hard limit though.

 thanks
 Daniel










Re: sync master with slaves with bittorrent?

2014-05-19 Thread Daniel Mahler
btw is there a command or script to update the slaves from the master?

thanks
Daniel


On Mon, May 19, 2014 at 1:48 AM, Andrew Ash and...@andrewash.com wrote:

 If the codebase for Spark's broadcast is pretty self-contained, you could
 consider creating a small bootstrap sent out via the doubling rsync
 strategy that Mosharaf outlined above (called Tree D=2 in the paper) that
 then pulled the larger

 Mosharaf, do you have a sense of whether the gains from using Cornet vs
 Tree D=2 with rsync outweighs the overhead of using a 2-phase broadcast
 mechanism?

 Andrew


 On Sun, May 18, 2014 at 11:32 PM, Aaron Davidson ilike...@gmail.comwrote:

 One issue with using Spark itself is that this rsync is required to get
 Spark to work...

 Also note that a similar strategy is used for *updating* the spark
 cluster on ec2, where the diff aspect is much more important, as you
 might only make a small change on the driver node (recompile or
 reconfigure) and can get a fast sync.


 On Sun, May 18, 2014 at 11:22 PM, Mosharaf Chowdhury 
 mosharafka...@gmail.com wrote:

 What twitter calls murder, unless it has changed since then, is just a
 BitTornado wrapper. In 2011, We did some comparison on the performance of
 murder and the TorrentBroadcast we have right now for Spark's own broadcast
 (Section 7.1 in
 http://www.mosharaf.com/wp-content/uploads/orchestra-sigcomm11.pdf).
 Spark's implementation was 4.5X faster than murder.

 The only issue with using TorrentBroadcast to deploy code/VM is writing
 a wrapper around it to read from disk, but it shouldn't be too complicated.
 If someone picks it up, I can give some pointers on how to proceed (I've
 thought about doing it myself forever, but never ended up actually taking
 the time; right now I don't have enough free cycles either)

 Otherwise, murder/BitTornado would be better than the current strategy
 we have.

 A third option would be to use rsync; but instead of rsync-ing to every
 slave from the master, one can simply rsync from the master first to one
 slave; then use the two sources (master and the first slave) to rsync to
 two more; then four and so on. Might be a simpler solution without many
 changes.

 --
 Mosharaf Chowdhury
 http://www.mosharaf.com/


 On Sun, May 18, 2014 at 11:07 PM, Andrew Ash and...@andrewash.comwrote:

 My first thought would be to use libtorrent for this setup, and it
 turns out that both Twitter and Facebook do code deploys with a bittorrent
 setup.  Twitter even released their code as open source:


 https://blog.twitter.com/2010/murder-fast-datacenter-code-deploys-using-bittorrent


 http://arstechnica.com/business/2012/04/exclusive-a-behind-the-scenes-look-at-facebook-release-engineering/


 On Sun, May 18, 2014 at 10:44 PM, Daniel Mahler dmah...@gmail.comwrote:

 I am not an expert in this space either. I thought the initial rsync
 during launch is really just a straight copy that did not need the tree
 diff. So it seemed like having the slaves do the copying among it each
 other would be better than having the master copy to everyone directly.
 That made me think of bittorrent, though there may well be other systems
 that do this.
 From the launches I did today it seems that it is taking around 1
 minute per slave to launch a cluster, which can be a problem for clusters
 with 10s or 100s of slaves, particularly since on ec2  that time has to be
 paid for.


 On Sun, May 18, 2014 at 11:54 PM, Aaron Davidson 
 ilike...@gmail.comwrote:

 Out of curiosity, do you have a library in mind that would make it
 easy to setup a bit torrent network and distribute files in an rsync 
 (i.e.,
 apply a diff to a tree, ideally) fashion? I'm not familiar with this 
 space,
 but we do want to minimize the complexity of our standard ec2 launch
 scripts to reduce the chance of something breaking.


 On Sun, May 18, 2014 at 9:22 PM, Daniel Mahler dmah...@gmail.comwrote:

 I am launching a rather large cluster on ec2.
 It seems like the launch is taking forever on
 
 Setting up spark
 RSYNC'ing /root/spark to slaves...
 ...

 It seems that bittorrent might be a faster way to replicate
 the sizeable spark directory to the slaves
 particularly if there is a lot of not very powerful slaves.

 Just a thought ...

 cheers
 Daniel










Re: sync master with slaves with bittorrent?

2014-05-19 Thread Daniel Mahler
On Mon, May 19, 2014 at 2:04 AM, Daniel Mahler dmah...@gmail.com wrote:

 I agree that for updating rsync is probably preferable, and it seems like
 for that purpose it would also parallelize well since most of the time is
 spent computing checksums so the process is not constrained by the total
 i/o capacity of the master. However it is a problem for the initial
 replication of the master to the slaves. If you are running on ec2 then the
 dollar overhead of launching is quadratic in the number of slaves. if you
 launch a 100 machine cluster you will wait a 100 minutes, but you will pay
 for 1 machine minutes or 167 hours
 before anything useful starts to  happen.


Launch time does *not* increase linearly with number slaves as I thought I
was seeing.
It would still be nice to have a faster launch though.

cheers
Daniel



 On Mon, May 19, 2014 at 1:32 AM, Aaron Davidson ilike...@gmail.comwrote:

 One issue with using Spark itself is that this rsync is required to get
 Spark to work...

  Also note that a similar strategy is used for *updating* the spark
 cluster on ec2, where the diff aspect is much more important, as you
 might only make a small change on the driver node (recompile or
 reconfigure) and can get a fast sync.


 On Sun, May 18, 2014 at 11:22 PM, Mosharaf Chowdhury 
 mosharafka...@gmail.com wrote:

 What twitter calls murder, unless it has changed since then, is just a
 BitTornado wrapper. In 2011, We did some comparison on the performance of
 murder and the TorrentBroadcast we have right now for Spark's own broadcast
 (Section 7.1 in
 http://www.mosharaf.com/wp-content/uploads/orchestra-sigcomm11.pdf).
 Spark's implementation was 4.5X faster than murder.

 The only issue with using TorrentBroadcast to deploy code/VM is writing
 a wrapper around it to read from disk, but it shouldn't be too complicated.
 If someone picks it up, I can give some pointers on how to proceed (I've
 thought about doing it myself forever, but never ended up actually taking
 the time; right now I don't have enough free cycles either)

 Otherwise, murder/BitTornado would be better than the current strategy
 we have.

 A third option would be to use rsync; but instead of rsync-ing to every
 slave from the master, one can simply rsync from the master first to one
 slave; then use the two sources (master and the first slave) to rsync to
 two more; then four and so on. Might be a simpler solution without many
 changes.

 --
 Mosharaf Chowdhury
 http://www.mosharaf.com/


 On Sun, May 18, 2014 at 11:07 PM, Andrew Ash and...@andrewash.comwrote:

 My first thought would be to use libtorrent for this setup, and it
 turns out that both Twitter and Facebook do code deploys with a bittorrent
 setup.  Twitter even released their code as open source:


 https://blog.twitter.com/2010/murder-fast-datacenter-code-deploys-using-bittorrent


 http://arstechnica.com/business/2012/04/exclusive-a-behind-the-scenes-look-at-facebook-release-engineering/


 On Sun, May 18, 2014 at 10:44 PM, Daniel Mahler dmah...@gmail.comwrote:

 I am not an expert in this space either. I thought the initial rsync
 during launch is really just a straight copy that did not need the tree
 diff. So it seemed like having the slaves do the copying among it each
 other would be better than having the master copy to everyone directly.
 That made me think of bittorrent, though there may well be other systems
 that do this.
 From the launches I did today it seems that it is taking around 1
 minute per slave to launch a cluster, which can be a problem for clusters
 with 10s or 100s of slaves, particularly since on ec2  that time has to be
 paid for.


 On Sun, May 18, 2014 at 11:54 PM, Aaron Davidson 
 ilike...@gmail.comwrote:

 Out of curiosity, do you have a library in mind that would make it
 easy to setup a bit torrent network and distribute files in an rsync 
 (i.e.,
 apply a diff to a tree, ideally) fashion? I'm not familiar with this 
 space,
 but we do want to minimize the complexity of our standard ec2 launch
 scripts to reduce the chance of something breaking.


 On Sun, May 18, 2014 at 9:22 PM, Daniel Mahler dmah...@gmail.comwrote:

 I am launching a rather large cluster on ec2.
 It seems like the launch is taking forever on
 
 Setting up spark
 RSYNC'ing /root/spark to slaves...
 ...

 It seems that bittorrent might be a faster way to replicate
 the sizeable spark directory to the slaves
 particularly if there is a lot of not very powerful slaves.

 Just a thought ...

 cheers
 Daniel










sync master with slaves with bittorrent?

2014-05-18 Thread Daniel Mahler
I am launching a rather large cluster on ec2.
It seems like the launch is taking forever on

Setting up spark
RSYNC'ing /root/spark to slaves...
...

It seems that bittorrent might be a faster way to replicate
the sizeable spark directory to the slaves
particularly if there is a lot of not very powerful slaves.

Just a thought ...

cheers
Daniel


Re: sync master with slaves with bittorrent?

2014-05-18 Thread Daniel Mahler
I am not an expert in this space either. I thought the initial rsync during
launch is really just a straight copy that did not need the tree diff. So
it seemed like having the slaves do the copying among it each other would
be better than having the master copy to everyone directly. That made me
think of bittorrent, though there may well be other systems that do this.
From the launches I did today it seems that it is taking around 1 minute
per slave to launch a cluster, which can be a problem for clusters with 10s
or 100s of slaves, particularly since on ec2  that time has to be paid for.


On Sun, May 18, 2014 at 11:54 PM, Aaron Davidson ilike...@gmail.com wrote:

 Out of curiosity, do you have a library in mind that would make it easy to
 setup a bit torrent network and distribute files in an rsync (i.e., apply a
 diff to a tree, ideally) fashion? I'm not familiar with this space, but we
 do want to minimize the complexity of our standard ec2 launch scripts to
 reduce the chance of something breaking.


 On Sun, May 18, 2014 at 9:22 PM, Daniel Mahler dmah...@gmail.com wrote:

 I am launching a rather large cluster on ec2.
 It seems like the launch is taking forever on
 
 Setting up spark
 RSYNC'ing /root/spark to slaves...
 ...

 It seems that bittorrent might be a faster way to replicate
 the sizeable spark directory to the slaves
 particularly if there is a lot of not very powerful slaves.

 Just a thought ...

 cheers
 Daniel





Configuring Spark for reduceByKey on on massive data sets

2014-05-17 Thread Daniel Mahler
I have had a lot of success with Spark on large datasets,
both in terms of performance and flexibility.
However I hit a wall with reduceByKey when the RDD contains billions of
items.
I am reducing with simple functions like addition for building histograms,
so the reduction process should be constant memory.
I am using 10s of AWS-EC2 macines with 60G memory and 30 processors.

After a while the whole process just hangs.
I have not been able to isolate the root problem from the logs,
but I suspect that the problem is in the shuffling.
Simple mapping and filtering transfomations work fine,
and the reductions work fine if I reduce the data down to 10^8 items
makes the reduceByKey go through.

What do I need to do to make reducByKey work for 10^9 items.

thanks
Daniel