Get spark metrics in code

2016-09-09 Thread Han JU
Hi, I'd like to know if there's a possibility to get spark's metrics from code. For example val sc = new SparkContext(conf) val result = myJob(sc, ...) result.save(...) val gauge = MetricSystem.getGauge("org.apahce.spark") println(gauge.getValue) // or send to to internal

Re: Spark SQL - Encoders - case class

2016-06-06 Thread Han JU
Hi, I think encoders for case classes are already provided in spark. You'll just need to import them. val sql = new SQLContext(sc) import sql.implicits._ And then do the cast to Dataset. 2016-06-06 14:13 GMT+02:00 Dave Maughan : > Hi, > > I've figured out how

Re: Dataset API and avro type

2016-05-23 Thread Han JU
atabricks.com>: > That's definitely a bug. If you can come up with a small reproduction it > would be great if you could open a JIRA. > On May 22, 2016 12:21 PM, "Han JU" <ju.han.fe...@gmail.com> wrote: > >> Hi Michael, >> >> The error is like this

Dataset API and avro type

2016-05-20 Thread Han JU
Hello, I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However it does not seems to work with Avro data types: object Datasets extends App { val conf = new SparkConf() conf.setAppName("Dataset") conf.setMaster("local[2]") conf.setIfMissing("spark.serializer",

zero-length input partitions from parquet

2016-05-02 Thread Han JU
Hi, I just found out that we can have lots of empty input partitions when reading from parquet files. Sample code as following: val hconf = sc.hadoopConfiguration val job = new Job(hconf) FileInputFormat.setInputPaths(job, new Path("path_to_data"))

Re: How to distribute non-serializable object in transform task or broadcast ?

2015-08-07 Thread Han JU
If the object is something like an utility object (say a DB connection handler), I often use: @transient lazy val someObj = MyFactory.getObj(...) So basically `@transient` tell the closure cleaner don't serialize this, and the `lazy val` allows it to be initiated on each executor upon its

Submit application to spark on mesos cluster

2014-12-09 Thread Han JU
Hi, I have a little problem in submitting our application to a mesos cluster. Basically the mesos cluster is configured and I'm able to have spark-shell working correctly. Then I tried to launch our application jar (a uber, sbt assembly jar with all deps): bin/spark-submit --master

Re: mounting SSD devices of EC2 r3.8xlarge instances

2014-06-04 Thread Han JU
For SSDs in r3, maybe it's better to mount with `discard` option since it supports TRIM: What I did for r3.large: echo '/dev/xvdb /mnt ext4 defaults,noatime,nodiratime,discard 0 0' /etc/fstab mkfs.ext4 /dev/xvdb mount /dev/xvdb 2014-06-03 19:15 GMT+02:00 Matei Zaharia

Re: Shuffle file consolidation

2014-05-23 Thread Han JU
Hi Nathan, There's some explanation in the spark configuration section: ``` If set to true, consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to true

Re: Worker re-spawn and dynamic node joining

2014-05-20 Thread Han JU
for internal purposes, it is build on top of the spark-ec2 script (with some changes) and it has a module for adding/removing worker nodes on the fly. It looks like the attached screenshot. If you want i can give you some access. Thanks Best Regards On Wed, May 14, 2014 at 9:52 PM, Han JU

Re: Text file and shuffle

2014-05-18 Thread Han JU
I think the shuffle is unavoidable given that the input partitions (probably hadoop input spits in your case) are not arranged in the way of a cogroup job. But maybe you can try: 1) co-partition you data for cogroup: val par = HashPartitioner(128) val big =

Worker re-spawn and dynamic node joining

2014-05-14 Thread Han JU
Hi all, Just 2 questions: 1. Is there a way to automatically re-spawn spark workers? We've situations where executor OOM causes worker process to be DEAD and it does not came back automatically. 2. How to dynamically add (or remove) some worker machines to (from) the cluster? We'd like to

Re: How to read a multipart s3 file?

2014-05-07 Thread Han JU
Just some complements to other answers: If you output to, say, `s3://bucket/myfile`, then you can use this bucket as the input of other jobs (sc.textFile('s3://bucket/myfile')). By default all `part-xxx` files will be used. There's also `sc.wholeTextFiles` that you can play with. If you file is

No space left on device error when pulling data from s3

2014-05-06 Thread Han JU
Hi, I've a `no space left on device` exception when pulling some 22GB data from s3 block storage to the ephemeral HDFS. The cluster is on EC2 using spark-ec2 script with 4 m1.large. The code is basically: val in = sc.textFile(s3://...) in.saveAsTextFile(hdfs://...) Spark creates 750 input

Re: No space left on device error when pulling data from s3

2014-05-06 Thread Han JU
write temp files to /tmp/hadoop-root ? 2014-05-06 18:05 GMT+02:00 Han JU ju.han.fe...@gmail.com: Hi, I've a `no space left on device` exception when pulling some 22GB data from s3 block storage to the ephemeral HDFS. The cluster is on EC2 using spark-ec2 script with 4 m1.large. The code

Fwd: Spark RDD cache memory usage

2014-04-29 Thread Han JU
Hi, By default a fraction of the executor memory (60%) is reserved for RDD caching, so if there's no explicit caching in the code (eg. rdd.cache() etc.), or if we persist RDD with StorageLevel.DISK_ONLY, is this part of memory wated? Does Spark allocates the RDD cache memory dynamically? Or does