Does Spark consider the free space of hard drive of the data nodes?

2017-02-08 Thread Benyi Wang
We are trying to add 6 spare servers to our existing cluster. Those
machines have more CPU cores, more memory. Unfortunately, 3 of them can
only use 2.5” hard drives and total size of each node is about 7TB. The
other 3 nodes can only have 3.5” hard drives, but have 48TB each nodes. In
addition, the size of hard drivers of each node in the existing cluster is
16TB.

I knew this is not ideal, but we could not afford buying more 2.5” hard
drive to match to 48TB.

I’m running Spark 2.0.1 on my CDH 5.3.2 cluster, will upgrade to Spark 2.1
soon, and usually start Spark using yarn-cluster.

Here are my questions:

   - I still want Spark executors running on on all nodes in the cluster so
   that they are still useful during shuffle, but I want more data written to
   the nodes with larger hard drives when saving the data set to HDFS. Is it
   possible?
   - Will data source output consider the free space on each node?

Thanks.


Re: Accessing log for lost executors

2016-12-02 Thread Benyi Wang
Usually your executors were killed by YARN due to exceeding the memory. You
can change NodeManager's log to see if your application got killed. or use
command "yarn logs -applicationId " to download the logs.

On Thu, Dec 1, 2016 at 10:30 PM, Nisrina Luthfiyati <
nisrina.luthfiy...@gmail.com> wrote:

> Hi all,
>
> I'm trying to troubleshoot an ExecutorLostFailure issue.
> In Spark UI I noticed that executors tab only list active executors, is
> there any way that I can see the log for dead executors so that I can find
> out why it's dead/lost?
> I'm using Spark 1.5.2 on YARN 2.7.1.
>
> Thanks!
> Nisrina
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
I think your dataframes are converted from RDDs, Are those RDDs computed or
read from files directly? I guess it might affect how spark compute the
execution plan.

Try this: save your data frame which will be broadcasted to HDFS, and read
it back into a dataframe. Then do the join and check the explain plan.

On Sat, Nov 26, 2016 at 12:04 PM, Swapnil Shinde <swapnilushi...@gmail.com>
wrote:

> I am using Spark 1.6.3 and below is the real plan (a,b,c in above were
> just for illustration purpose)
>
> == Physical Plan ==
> Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
> mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
> +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
> [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
>:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
>:  +- TungstenExchange 
> hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200),
> None
>: +- Project [_1#3797 AS ltt#3800,_2#3798 AS
> mr_demo_id#3801,_3#3799 AS mr_demoname#3802]
>:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
>+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
>   +- TungstenExchange 
> hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200),
> None
>  +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
> +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
> AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806
> AS etv_demo_id#3813]
>+- Filter ((map_type#3809 = master_roster_to_etv) && NOT
> (demogroup#3803 = gender_age_id))
>   +- Scan ExistingRDD[demogroup#3803,
> demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,
> demovalue_old_map#3808,map_type#3809]
>
>
> Thanks
> Swapnil
>
> On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang <bewang.t...@gmail.com> wrote:
>
>> Could you post the result of explain `c.explain`? If it is broadcast
>> join, you will see it in explain.
>>
>> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde <
>> swapnilushi...@gmail.com> wrote:
>>
>>> Hello
>>> I am trying a broadcast join on dataframes but it is still doing
>>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>>> higher but still no luck.
>>>
>>> Related piece of code-
>>>   val c = a.join(braodcast(b), "id")
>>>
>>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>>> around 4964 bytes.
>>> Help is very much appreciated!!
>>>
>>> Thanks
>>> Swapnil
>>>
>>>
>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
Could you post the result of explain `c.explain`? If it is broadcast join,
you will see it in explain.

On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde 
wrote:

> Hello
> I am trying a broadcast join on dataframes but it is still doing
> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
> higher but still no luck.
>
> Related piece of code-
>   val c = a.join(braodcast(b), "id")
>
> On a side note, if I do SizeEstimator.estimate(b) and it is really
> high(460956584 bytes) compared to data it contains. b has just 85 rows and
> around 4964 bytes.
> Help is very much appreciated!!
>
> Thanks
> Swapnil
>
>
>


Does DeserializeToObject mean that a Row is deserialized to Java objects?

2016-11-07 Thread Benyi Wang
Below is my test code using Spark 2.0.1. DeserializeToObject doesn’t exist
in filter() but in map(). Does it means map() does not Tungsten operation?

case class Event(id: Long)
val e1 = Seq(Event(1L), Event(2L)).toDSval e2 = Seq(Event(2L), Event(3L)).toDS

e1.filter(e=>e.id < 10 && e.id > 5).explain
// == Physical Plan ==// *Filter .apply// +- LocalTableScan [id#145L]

e1.map(e=>e.id < 10 && e.id > 5).explain// == Physical Plan ==//
*SerializeFromObject [input[0, boolean, true] AS value#155]// +-
*MapElements , obj#154: boolean//+-
*DeserializeToObject newInstance(class $line41.$read$$iw$$iw$Event),
obj#153: // $line41.$read$$iw$$iw$Event//   +- LocalTableScan
[id#145L]

Another question: If I register a complex function as a UDF, in what
situation, DeserializeToObject/SerialzeFromObject will happen?

Thanks.
​


Re: classpath conflict with spark internal libraries and the spark shell.

2016-09-09 Thread Benyi Wang
I had a problem when I used "spark.executor.userClassPathFirst" before. I
don't remember what the problem is.

Alternatively, you can use --driver-class-path and "--conf
spark.executor.extraClassPath". Unfortunately you may feel frustrated like
me when trying to make it work.

Depends on how you run spark:
- standalone or yarn,
- run as Application or in spark-shell
The configuration will be different. It is hard to say in a short, so I
wrote two blogs to explain it.
http://ben-tech.blogspot.com/2016/05/how-to-resolve-spark-cassandra.html
http://ben-tech.blogspot.com/2016/04/how-to-resolve-spark-cassandra.html

Hope those blogs help.

If you still have class conflict problem, you can consider to load the
external library and its dependencies using a special classloader just like
spark-hive, which can load the specified version of hive jars.

On Fri, Sep 9, 2016 at 2:53 PM, Colin Kincaid Williams 
wrote:

> My bad, gothos on IRC pointed me to the docs:
>
> http://jhz.name/2016/01/10/spark-classpath.html
>
> Thanks Gothos!
>
> On Fri, Sep 9, 2016 at 9:23 PM, Colin Kincaid Williams 
> wrote:
> > I'm using the spark shell v1.61 . I have a classpath conflict, where I
> > have an external library ( not OSS either :( , can't rebuild it.)
> > using httpclient-4.5.2.jar. I use spark-shell --jars
> > file:/path/to/httpclient-4.5.2.jar
> >
> > However spark is using httpclient-4.3 internally. Then when I try to
> > use the external library I get
> >
> > getClass.getResource("/org/apache/http/conn/ssl/
> SSLConnectionSocketFactory.class");
> >
> > res5: java.net.URL =
> > jar:file:/opt/spark-1.6.1-bin-hadoop2.4/lib/spark-assembly-
> 1.6.1-hadoop2.4.0.jar!/org/apache/http/conn/ssl/
> SSLConnectionSocketFactory.class
> >
> > How do I get spark-shell on 1.6.1 to allow me to use the external
> > httpclient-4.5.2.jar for my application,and ignore it's internal
> > library. Or is this not possible?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: The error to read HDFS custom file in spark.

2016-03-19 Thread Benyi Wang
I would say change

class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
extends FileInputFormat

to

class RawDataInputFormat[LongWritable, RDRawDataRecord] extends FileInputFormat

​

On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh 
wrote:

> Hi Tony,
>
> Is
>
> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>
> One of your own packages?
>
> Sounds like it is one throwing the error
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 March 2016 at 15:21, Tony Liu  wrote:
>
>> Hi,
>>My HDFS file is store with custom data structures. I want to read it
>> with SparkContext object.So I define a formatting object:
>>
>> *1. code of RawDataInputFormat.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.LongWritable
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/16/16.
>>   */
>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> FileInputFormat {
>>
>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
>> Reporter): RecordReader[LW, RD] = {
>> new RawReader(split, job, reporter)
>>   }
>>
>> }
>>
>> *2. code of RawReader.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/17/16.
>>   */
>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> RecordReader[LW, RD] {
>>
>>   var reader: SequenceFile.Reader = null
>>   var currentPos: Long = 0L
>>   var length: Long = 0L
>>
>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>> this()
>> val p = (split.asInstanceOf[FileSplit]).getPath
>> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>   }
>>
>>   override def next(key: LW, value: RD): Boolean = {
>> val flag = reader.next(key, value)
>> currentPos = reader.getPosition()
>> flag
>>   }
>>
>>   override def getProgress: Float = Math.min(1.0f, currentPos / 
>> length.toFloat)
>>
>>   override def getPos: Long = currentPos
>>
>>   override def createKey(): LongWritable = {
>> new LongWritable()
>>   }
>>
>>   override def close(): Unit = {
>> reader.close()
>>   }
>>
>>   override def createValue(): RDRawDataRecord = {
>> new RDRawDataRecord()
>>   }
>> }
>>
>> *3. code of RDRawDataRecord.scala*
>>
>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>> import java.io.DataInput;
>> import java.io.DataOutput;
>> import java.io.IOException;
>> import org.apache.commons.lang.StringUtils;
>> import org.apache.hadoop.io.Writable;
>>
>> public class RDRawDataRecord implements Writable {
>> private String smac;
>> private String dmac;
>> private int hrssi;
>> private int lrssi;
>> private long fstamp;
>> private long lstamp;
>> private long maxstamp;
>> private long minstamp;
>> private long stamp;
>>
>> public void readFields(DataInput in) throws IOException {
>> this.smac = in.readUTF();
>> this.dmac = in.readUTF();
>> this.hrssi = in.readInt();
>> this.lrssi = in.readInt();
>> this.fstamp = in.readLong();
>> this.lstamp = in.readLong();
>> this.maxstamp = in.readLong();
>> this.minstamp = in.readLong();
>> this.stamp = in.readLong();
>> }
>>
>> public void write(DataOutput out) throws IOException {
>> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>> out.writeInt(this.hrssi);
>> out.writeInt(this.lrssi);
>> out.writeLong(this.fstamp);
>> out.writeLong(this.lstamp);
>> out.writeLong(this.maxstamp);
>> out.writeLong(this.minstamp);
>> out.writeLong(this.stamp);
>> }
>>
>> */** *
>>
>> *ignore getter setter*
>>
>> ***/*
>>
>> }
>>
>> *At last, I use this code to run*:
>>
>> val filePath = 
>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>> val conf = new SparkConf()
>> conf.setMaster("local")
>> conf.setAppName("demo")
>> val sc = new SparkContext(conf)
>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>> file.foreach(v => {
>>   println(v._2.getDmac) // Attribute of custom objects
>> })
>>
>> *I get an error, it says:*
>>
>> Error:(41, 19) type arguments 
>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>>  conform to the 

Re: How to control the number of files for dynamic partition in Spark SQL?

2016-02-01 Thread Benyi Wang
Thanks Deenar, both two methods work.

I actually tried the second method in spark-shell, but it didn't work at
that time. The reason might be: I registered the data frame eventwk as a
temporary table, repartition, then register the table again. Unfortunately
I could not reproduce it.

Thanks again.

On Sat, Jan 30, 2016 at 1:25 AM, Deenar Toraskar <deenar.toras...@gmail.com>
wrote:

> The following should work as long as your tables are created using Spark
> SQL
>
> event_wk.repartition(2).write.partitionBy("eventDate").format("parquet"
> ).insertInto("event)
>
> If you want to stick to using "insert overwrite" for Hive compatibility,
> then you can repartition twice, instead of setting the global
> spark.sql.shuffle.partition parameter
>
> df eventwk = sqlContext.sql("some joins") // this should use the global
> shuffle partition parameter
> df eventwkRepartitioned = eventwk.repartition(2)
> eventwkRepartitioned.registerTempTable("event_wk_repartitioned")
> and use this in your insert statement.
>
> registering temp table is cheap
>
> HTH
>
>
> On 29 January 2016 at 20:26, Benyi Wang <bewang.t...@gmail.com> wrote:
>
>> I want to insert into a partition table using dynamic partition, but I
>> don’t want to have 200 files for a partition because the files will be
>> small for my case.
>>
>> sqlContext.sql(  """
>> |insert overwrite table event
>> |partition(eventDate)
>> |select
>> | user,
>> | detail,
>> | eventDate
>> |from event_wk
>>   """.stripMargin)
>>
>> the table “event_wk” is created from a dataframe by registerTempTable,
>> which is built with some joins. If I set spark.sql.shuffle.partition=2, the
>> join’s performance will be bad because that property seems global.
>>
>> I can do something like this:
>>
>> event_wk.reparitition(2).write.partitionBy("eventDate").format("parquet").save(path)
>>
>> but I have to handle adding partitions by myself.
>>
>> Is there a way you can control the number of files just for this last
>> insert step?
>> ​
>>
>
>


How to control the number of files for dynamic partition in Spark SQL?

2016-01-29 Thread Benyi Wang
I want to insert into a partition table using dynamic partition, but I
don’t want to have 200 files for a partition because the files will be
small for my case.

sqlContext.sql(  """
|insert overwrite table event
|partition(eventDate)
|select
| user,
| detail,
| eventDate
|from event_wk
  """.stripMargin)

the table “event_wk” is created from a dataframe by registerTempTable,
which is built with some joins. If I set spark.sql.shuffle.partition=2, the
join’s performance will be bad because that property seems global.

I can do something like this:

event_wk.reparitition(2).write.partitionBy("eventDate").format("parquet").save(path)

but I have to handle adding partitions by myself.

Is there a way you can control the number of files just for this last
insert step?
​


Re: How to write a custom window function?

2016-01-28 Thread Benyi Wang
Never mind.

GenericUDAFCollectList supports struct in 1.3.0. I modified it and it works
in a tricky way.

I also found an example HiveWindowFunction.

On Thu, Jan 28, 2016 at 12:49 PM, Benyi Wang <bewang.t...@gmail.com> wrote:

> I'm trying to implement a WindowFunction like collect_list, but I have to
> collect a struct. collect_list works only for primitive type.
>
> I think I might modify GenericUDAFCollectList, but haven't tried it yet.
>
> I'm wondering if there is an example showing how to write a custom
> WindowFunction in Spark-sql
>
> Thanks.
>
>
>


How to write a custom window function?

2016-01-28 Thread Benyi Wang
I'm trying to implement a WindowFunction like collect_list, but I have to
collect a struct. collect_list works only for primitive type.

I think I might modify GenericUDAFCollectList, but haven't tried it yet.

I'm wondering if there is an example showing how to write a custom
WindowFunction in Spark-sql

Thanks.


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Benyi Wang
   - I assume your parquet files are compressed. Gzip or Snappy?
   - What spark version did you use? It seems at least 1.4. If you use
   spark-sql and tungsten, you might have better performance. but spark 1.5.2
   gave me a wrong result when the data was about 300~400GB, just for a simple
   group-by and aggregate.
   - Did you use kyro serialization?
   - you should have spark.shuffle.compress=true, verify it.
   - How many tasks did you use? spark.default.parallelism=?
   - What about this:
  - Read the data day by day
  - compute a bucket id from timestamp, e.g., the date and hour
  - Write into different buckets (you probably need a special writer to
  write data efficiently without shuffling the data).
  - distinct for each bucket. Because each bucket is small, spark can
  get it done faster than having everything in one run.
  - I think using groupBy (userId, timestamp) might be better than
  distinct. I guess distinct() will compare every field.


On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue  wrote:

> And the most frequent operation I am gonna do is find the UserID who have
> some events, then retrieve all the events associted with the UserID.
>
> In this case, how should I partition to speed up the process?
>
> Thanks.
>
> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue  wrote:
>
>> hey Ted,
>>
>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>> MetaData.  I just parse it from Json and save as Parquet, did not change
>> the partition.
>>
>> Annoyingly, every day's incoming Event data having duplicates among each
>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>
>> I only want to keep single Event table and each day it come so many
>> duplicates.
>>
>> Is there a way I could just insert into Parquet and if duplicate found,
>> just ignore?
>>
>> Thanks,
>> Gavin
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu  wrote:
>>
>>> Is your Parquet data source partitioned by date ?
>>>
>>> Can you dedup within partitions ?
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue 
>>> wrote:
>>>
 I tried on Three day's data.  The total input is only 980GB, but the
 shuffle write Data is about 6.2TB, then the job failed during shuffle read
 step, which should be another 6.2TB shuffle read.

 I think to Dedup, the shuffling can not be avoided. Is there anything I
 could do to stablize this process?

 Thanks.


 On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue 
 wrote:

> Hey,
>
> I got everyday's Event table and want to merge them into a single
> Event table. But there so many duplicates among each day's data.
>
> I use Parquet as the data source.  What I am doing now is
>
> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
> file").
>
> Each day's Event is stored in their own Parquet file
>
> But it failed at the stage2 which keeps losing connection to one
> executor. I guess this is due to the memory issue.
>
> Any suggestion how I do this efficiently?
>
> Thanks,
> Gavin
>


>>>
>>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Benyi Wang
Just try to give 1000, even 2000 to see if it works. If your see something
like "Lost Executor", you'd better to stop your job, otherwise you are
wasting time. Usually the container of the lost executor is killed by
NodeManager because there is not enough memory. You can check NodeManager's
log to confirm it.

There are couple of parameters may affect the performance of shuffle.

--num-executors use larger number, e.g., 2 x #data nodes
--executor-cores give small number 3/4
--executor-memory #cores x (memory for one core)

increase spark.shuffle.memoryFraction

With larger number of spark.sql.shuffle.partitions, a partition (task) will
be smaller and fit in the memory for one core. If you use too large
partitions, the performance might be worse. You have to try based on your
cluster's nodes/memory.

On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> I used to maintain a HBase cluster. The experience with it was not happy.
>
> I just tried query the data  from each day's first and dedup with smaller
> set, the performance is acceptable.  So I guess I will use this method.
>
> Again, could anyone give advice about:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
> Thanks.
>
> Gavin
>
>
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> bq. in an noSQL db such as Hbase
>>
>> +1 :-)
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> One option you may want to explore is writing event table in an noSQL db
>>> such as Hbase. One inherent problem in your approach is you always need to
>>> load either full data set or a defined number of partitions to see if the
>>> event has already come (and no gurantee it is full proof, but lead to
>>> unnecessary loading in most cases).
>>>
>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>> Thank you for the answer. I checked the setting you mentioend they are
>>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>>
>>>> In the doc, it mentions:
>>>>
>>>>- Automatically determine the number of reducers for joins and
>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>parallelism post-shuffle using “SET
>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>
>>>>
>>>> What would be the ideal number for this setting? Is it based on the
>>>> hardware of cluster?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Gavin
>>>>
>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>>>- What spark version did you use? It seems at least 1.4. If you
>>>>>use spark-sql and tungsten, you might have better performance. but 
>>>>> spark
>>>>>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
>>>>> for a
>>>>>simple group-by and aggregate.
>>>>>- Did you use kyro serialization?
>>>>>- you should have spark.shuffle.compress=true, verify it.
>>>>>- How many tasks did you use? spark.default.parallelism=?
>>>>>- What about this:
>>>>>   - Read the data day by day
>>>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>>>   - Write into different buckets (you probably need a special
>>>>>   writer to write data efficiently without shuffling the data).
>>>>>   - distinct for each bucket. Because each bucket is small, spark
>>>>>   can get it done faster than having everything in one run.
>>>>>   - I think using groupBy (userId, timestamp) might be better
>>>>>   than distinct. I guess distinct() will compare every field.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> And the most fre

Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Benyi Wang
DataFrame filterFrame1 =
sourceFrame.filter(col("col1").contains("xyz"));DataFrame
frameToProcess = sourceFrame.except(filterFrame1);

except is really expensive. Do you actually want this:

sourceFrame.filter(! col("col1").contains("xyz"))

​

On Thu, Dec 10, 2015 at 9:57 AM, unk1102  wrote:

> Hi I have spark job which reads Hive-ORC data and processes and generates
> csv
> file in the end. Now this ORC files are hive partitions and I have around
> 2000 partitions to process every day. These hive partitions size is around
> 800 GB in HDFS. I have the following method code which I call it from a
> thread spawn from spark driver. So in this case 2000 threads gets processed
> and those runs painfully slow around 12 hours making huge data shuffling
> each executor shuffles around 50 GB of data. I am using 40 executors of 4
> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2 release.
>
> public void callThisFromThread() {
> DataFrame sourceFrame =
> hiveContext.read().format("orc").load("/path/in/hdfs");
> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
> DataFrame frameToProcess = sourceFrame.except(filterFrame1);
> JavaRDD updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
> .
> }
> DataFrame updatedFrame =
> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
> DataFrame groupFrame =
> selectFrame.groupBy("col1","col2","col8").agg("..");//8 column
> group
> by
> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
> }
>
> Please guide me how can I optimize above code I cant avoid group by which
> is
> evil I know I have to do group on 8 fields mentioned above.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Benyi Wang
I don't understand this: "I have the following method code which I call it
from a thread spawn from spark driver. So in this case 2000 threads ..."

Why do you call it from a thread?
Are you process one partition in one thread?

On Thu, Dec 10, 2015 at 11:13 AM, Benyi Wang <bewang.t...@gmail.com> wrote:

> DataFrame filterFrame1 = 
> sourceFrame.filter(col("col1").contains("xyz"));DataFrame frameToProcess = 
> sourceFrame.except(filterFrame1);
>
> except is really expensive. Do you actually want this:
>
> sourceFrame.filter(! col("col1").contains("xyz"))
>
> ​
>
> On Thu, Dec 10, 2015 at 9:57 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>
>> Hi I have spark job which reads Hive-ORC data and processes and generates
>> csv
>> file in the end. Now this ORC files are hive partitions and I have around
>> 2000 partitions to process every day. These hive partitions size is around
>> 800 GB in HDFS. I have the following method code which I call it from a
>> thread spawn from spark driver. So in this case 2000 threads gets
>> processed
>> and those runs painfully slow around 12 hours making huge data shuffling
>> each executor shuffles around 50 GB of data. I am using 40 executors of 4
>> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2 release.
>>
>> public void callThisFromThread() {
>> DataFrame sourceFrame =
>> hiveContext.read().format("orc").load("/path/in/hdfs");
>> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
>> DataFrame frameToProcess = sourceFrame.except(filterFrame1);
>> JavaRDD updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
>> .
>> }
>> DataFrame updatedFrame =
>> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
>> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
>> DataFrame groupFrame =
>> selectFrame.groupBy("col1","col2","col8").agg("..");//8 column
>> group
>> by
>> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
>> }
>>
>> Please guide me how can I optimize above code I cant avoid group by which
>> is
>> evil I know I have to do group on 8 fields mentioned above.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to implement zipWithIndex as a UDF?

2015-10-28 Thread Benyi Wang
Thanks Michael.

I should make my question more clear. This is the data type:

StructType(Seq(
   StructField("uid", LongType),
   StructField("infos", ArrayType(
  StructType(Seq(
 StructType("cid", LongType),
 StructType("cnt", LongType)
  ))
   ))
))

I want to explode “infos” to get three columns “uid”, “index” and “info”.
The only way I figured out is to explode the whole nested data type into a
tuple of primary data types like this:

df.explode("infos") { (r: Row) =>
val arr = row.getSeq[Row](0)
arr.zipWithIndex.map {
  case (info, idx) =>
(idx, info.getLong(0), info.getLong(1))
}

What I really want is to keep info as a struct type.

df.explode("infos") { (r: Row) =>
val arr = row.getSeq[Row](0)
arr.zipWithIndex.map {
  case (info, idx) =>
(idx, info)
}

Unfortunately the current DataFrame API doesn’t support it: the explode
methods try to figure out the schema for the exploded data, but could not
handle Any or Row type in reflection, and the caller has no way to pass
through a schema for the exploded data.
​

On Fri, Oct 23, 2015 at 12:44 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> The user facing type mapping is documented here:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
>
> On Fri, Oct 23, 2015 at 12:10 PM, Benyi Wang <bewang.t...@gmail.com>
> wrote:
>
>> If I have two columns
>>
>> StructType(Seq(
>>   StructField("id", LongType),
>>   StructField("phones", ArrayType(StringType
>>
>> I want to add index for “phones” before I explode it.
>>
>> Can this be implemented as GenericUDF?
>>
>> I tried DataFrame.explode. It worked for simple types like string, but I
>> could not figure out how to handle a nested type like StructType.
>>
>> Can somebody shed a light?
>>
>> I’m using spark 1.5.1.
>> ​
>>
>
>


How to implement zipWithIndex as a UDF?

2015-10-23 Thread Benyi Wang
If I have two columns

StructType(Seq(
  StructField("id", LongType),
  StructField("phones", ArrayType(StringType

I want to add index for “phones” before I explode it.

Can this be implemented as GenericUDF?

I tried DataFrame.explode. It worked for simple types like string, but I
could not figure out how to handle a nested type like StructType.

Can somebody shed a light?

I’m using spark 1.5.1.
​


MatrixFactorizationModel.save got StackOverflowError

2015-08-13 Thread Benyi Wang
I'm using spark-1.4.1 and compile it against CDH5.3.2. When I use
ALS.trainImplicit to build a model, I got this error when rank=40 and
iterations=30.


It worked for (rank=10, iteration=10) and (rank=20, iteration=20).


What was wrong with (rank=40, iterations=30)?



15/08/13 01:16:40 INFO scheduler.DAGScheduler: Got job 66
(saveAsTextFile at MatrixFactorizationModel.scala:283) with 1 output
partitions (allowLocal=false)

15/08/13 01:16:40 INFO scheduler.DAGScheduler: Final stage:
ResultStage 2394(saveAsTextFile at MatrixFactorizationModel.scala:283)


...

15/08/13 01:16:41 INFO scheduler.DAGScheduler: Job 66 finished:
saveAsTextFile at MatrixFactorizationModel.scala:283, took 0.538016 s


...


15/08/13 01:16:42 INFO parquet.ParquetRelation2: Using default output
committer for Parquet: parquet.hadoop.ParquetOutputCommitter
15/08/13 01:16:42 INFO sources.DefaultWriterContainer: Using user
defined output committer class parquet.hadoop.ParquetOutputCommitter
15/08/13 01:16:42 INFO spark.SparkContext: Starting job: parquet at
MatrixFactorizationModel.scala:284
15/08/13 01:16:42 INFO scheduler.DAGScheduler: Got job 67 (parquet at
MatrixFactorizationModel.scala:284) with 432 output partitions
(allowLocal=false)
15/08/13 01:16:42 INFO scheduler.DAGScheduler: Final stage:
ResultStage 2460(parquet at MatrixFactorizationModel.scala:284)
15/08/13 01:16:42 INFO scheduler.DAGScheduler: Parents of final stage:
List(ShuffleMapStage 2459, ShuffleMapStage 2398)
15/08/13 01:16:42 INFO scheduler.DAGScheduler: Missing parents: List()
15/08/13 01:16:42 INFO scheduler.DAGScheduler: Submitting ResultStage
2460 (MapPartitionsRDD[634] at parquet at
MatrixFactorizationModel.scala:284), which has no missing parents
15/08/13 01:16:42 INFO cluster.YarnClusterScheduler: Cancelling stage 2460
15/08/13 01:16:42 INFO scheduler.DAGScheduler: ResultStage 2460
(parquet at MatrixFactorizationModel.scala:284) failed in Unknown s
15/08/13 01:16:42 INFO scheduler.DAGScheduler: Job 67 failed: parquet
at MatrixFactorizationModel.scala:284, took 0.249275 s
15/08/13 01:16:42 ERROR sources.InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure:
Task serialization failed: java.lang.StackOverflowError
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
scala.collection.immutable.$colon$colon.writeObject(List.scala:379)


...


15/08/13 01:16:42 ERROR yarn.ApplicationMaster: User class threw
exception: org.apache.spark.SparkException: Job aborted.
org.apache.spark.SparkException: Job aborted.
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:166)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:139)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
at 
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel$SaveLoadV1_0$.save(MatrixFactorizationModel.scala:284)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.save(MatrixFactorizationModel.scala:141)
at 
com.mycompany.recommendation.ModelTrainer.train(ModelTrainer.scala:49)
at com.mycompany.recommendation.ModelTrainer$.run(ModelTrainer.scala:96)
at 

Re: Spark Maven Build

2015-08-10 Thread Benyi Wang
Never mind. Instead of set property in the profile

profile
idcdh5.3.2/id
properties
   hadoop.version2.5.0-cdh5.3.2/hadoop.version
   ...
/properties
profile

I have to change the property hadoop.version from 2.2.0 to 2.5.0-cdh5.3.2
in spark-parent's pom.xml. Otherwise, maven will resolve transitive
dependencies using the default version 2.2.0.

On Fri, Aug 7, 2015 at 8:45 PM, Benyi Wang bewang.t...@gmail.com wrote:

 I'm trying to build spark 1.4.1 against CDH 5.3.2. I created a profile
 called cdh5.3.2 in spark_parent.pom, made some changes for
 sql/hive/v0.13.1, and the build finished successfully.

 Here is my problem:

- If I run `mvn -Pcdh5.3.2,yarn,hive install`, the artifacts are
installed into my local repo.
- I expected `hadoop-client` version should be
`hadoop-client-2.5.0-cdh5.3.2`, but it actually `hadoop-client-2.2.0`.

 If I add a dependency of `spark-sql-1.2.0-cdh5.3.2`, the version is
 `hadoop-client-2.5.0-cdh5.3.2`.

 What's the trick behind it?




Spark Maven Build

2015-08-07 Thread Benyi Wang
I'm trying to build spark 1.4.1 against CDH 5.3.2. I created a profile
called cdh5.3.2 in spark_parent.pom, made some changes for
sql/hive/v0.13.1, and the build finished successfully.

Here is my problem:

   - If I run `mvn -Pcdh5.3.2,yarn,hive install`, the artifacts are
   installed into my local repo.
   - I expected `hadoop-client` version should be
   `hadoop-client-2.5.0-cdh5.3.2`, but it actually `hadoop-client-2.2.0`.

If I add a dependency of `spark-sql-1.2.0-cdh5.3.2`, the version is
`hadoop-client-2.5.0-cdh5.3.2`.

What's the trick behind it?


Re: Does Spark automatically run different stages concurrently when possible?

2015-01-10 Thread Benyi Wang
You may try to change the schudlingMode to FAIR, the default is FIFO. Take
a look at this page

https://spark.apache.org/docs/1.1.0/job-scheduling.html#scheduling-within-an-application



On Sat, Jan 10, 2015 at 10:24 AM, YaoPau jonrgr...@gmail.com wrote:

 I'm looking for ways to reduce the runtime of my Spark job.  My code is a
 single file of scala code and is written in this order:

 (1) val lines = Import full dataset using sc.textFile
 (2) val ABonly = Parse out all rows that are not of type A or B
 (3) val processA = Process only the A rows from ABonly
 (4) val processB = Process only the B rows from ABonly

 Is Spark doing (1) then (2) then (3) then (4) ... or is it by default doing
 (1) then (2) then branching to both (3) and (4) simultaneously and running
 both in parallel?  If not, how can I make that happen?

 Jon





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




HTTP 500 Error for SparkUI in YARN Cluster mode

2014-12-14 Thread Benyi Wang
I got this error when I click Track URL: ApplicationMaster when I run a
spark job in YARN cluster mode. I found this jira
https://issues.apache.org/jira/browse/YARN-800, but I could not get this
problem fixed. I'm running CDH 5.1.0 with Both HDFS and RM HA enabled. Does
anybody has the similar issue? How do you fix this?HTTP ERROR 500

Problem accessing /proxy/application_1418016558670_0193/. Reason:

Connection refused

Caused by:

java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)


Custom persist or cache of RDD?

2014-11-10 Thread Benyi Wang
When I have a multi-step process flow like this:

A - B - C - D - E - F

I need to store B and D's results into parquet files

B.saveAsParquetFile
D.saveAsParquetFile

If I don't cache/persist any step, spark might recompute from A,B,C,D and E
if something is wrong in F.

Of course, I'd better cache all steps if I have enough memory to avoid this
re-computation, or persist result to disk. But persisting B and D seems
duplicate with saving B and D as parquet files.

I'm wondering if spark can restore B and D from the parquet files using a
customized persist and restore procedure?


Best practice for join

2014-11-04 Thread Benyi Wang
I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,

# build (K,V) from A and B to prepare the join

val ja = A.map( r = (K1, Va))
val jb = B.map( r = (K1, Vb))

# join A, B

val jab = ja.join(jb)

# build (K,V) from the joined result of A and B to prepare joining with C

val jc = C.map(r = (K2, Vc))
jab.join(jc).map( = (K,V) ).reduceByKey(_ + _)

Because A may have multiple fields, so Va is a tuple with more than 2
fields. It is said that scala Tuple may not be specialized, and there is
boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc,
K2 and K which are compound keys, and V is a pair of count and ratio, _+_
will create a new ratio. I register those case classes in Kryo.

The sizes of Shuffle read/write look smaller. But I found GC overhead is
really high: GC Time is about 20~30% of duration for the reduceByKey task.
I think a lot of new objects are created using case classes during
map/reduce.

How to make the thing better?


Re: Best practice for join

2014-11-04 Thread Benyi Wang
I'm using spark-1.0.0 in CDH 5.1.0. The big problem is SparkSQL doesn't
support Hash join in this version.

On Tue, Nov 4, 2014 at 10:54 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 How about Using SparkSQL https://spark.apache.org/sql/?

 Thanks
 Best Regards

 On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang bewang.t...@gmail.com wrote:

 I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,

 # build (K,V) from A and B to prepare the join

 val ja = A.map( r = (K1, Va))
 val jb = B.map( r = (K1, Vb))

 # join A, B

 val jab = ja.join(jb)

 # build (K,V) from the joined result of A and B to prepare joining with C

 val jc = C.map(r = (K2, Vc))
 jab.join(jc).map( = (K,V) ).reduceByKey(_ + _)

 Because A may have multiple fields, so Va is a tuple with more than 2
 fields. It is said that scala Tuple may not be specialized, and there is
 boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc,
 K2 and K which are compound keys, and V is a pair of count and ratio, _+_
 will create a new ratio. I register those case classes in Kryo.

 The sizes of Shuffle read/write look smaller. But I found GC overhead is
 really high: GC Time is about 20~30% of duration for the reduceByKey task.
 I think a lot of new objects are created using case classes during
 map/reduce.

 How to make the thing better?





How to make Spark-sql join using HashJoin

2014-10-06 Thread Benyi Wang
I'm using CDH 5.1.0 with Spark-1.0.0. There is spark-sql-1.0.0 in clouder'a
maven repository. After put it into the classpath, I can use spark-sql in
my application.

One of issue is that I couldn't make the join as a hash join. It gives
CartesianProduct when I join two SchemaRDDs as follows:

scala val event =
sqlContext.parquetFile(/events/2014-09-28).select('MediaEventID).join(log,
joinType=LeftOuter, on=Some(event.eventid.attr === log.eventid.attr))
== Query Plan ==
BroadcastNestedLoopJoin LeftOuter, Some(('event.eventid = 'log.eventid))
 ParquetTableScan [eventid#130L], (ParquetRelation /events/2014-09-28), None
 ParquetTableScan [eventid#125L,listid#126L,isfavorite#127],
(ParquetRelation /logs/eventdt=2014-09-28), None

If I join with another SchemaRDD, I would get Cartesian Product. Is it
possible that make the join as a hash join in Spark-1.0.0?


Spark Language Integrated SQL for join on expression

2014-09-29 Thread Benyi Wang
scala user
res19: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
ParquetTableScan [id#0,name#1], (ParquetRelation
/user/hive/warehouse/user), None

scala order
res20: org.apache.spark.sql.SchemaRDD =
SchemaRDD[72] at RDD at SchemaRDD.scala:98
== Query Plan ==
ParquetTableScan [id#8,userid#9,unit#10], (ParquetRelation
/user/hive/warehouse/orders), None

For joining SchemaRDD user and order, This will generate Ambiguous issue
because both of tables have 'id.

user.join(order, on=Some('id === 'userid))

How can I specify an expression which can use SchemaRDD name and column
together? Something might be like 'user.'id. This expression currently
doesn't work in Spark 1.0.0 CDH 5.1.0.


How to set KryoRegistrator class in spark-shell

2014-08-20 Thread Benyi Wang
I want to use opencsv's CSVParser to parse csv lines using a script like
below in spark-shell:

import au.com.bytecode.opencsv.CSVParser;
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
import org.apache.hadoop.fs.{Path, FileSystem}

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo:Kryo) {
kryo.register(classOf[CSVParser])
  }
}

val outDir=/tmp/dmc-out

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.delete(new Path(outDir), true);

val largeLines = sc.textFile(/tmp/dmc-03-08/*.gz)
val parser = new CSVParser('|', '')
largeLines.map(parser.parseLine(_).toList).saveAsTextFile(outDir,
classOf[org.apache.hadoop.io.compress.GzipCodec])

If I start spark-shell with spark.kryo.registrator like this

SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer
-Dspark.kryo.registrator=MyKryoRegistrator spark-shell

it complains that MyKroRegistrator not found when I run :load my_script
in spark-shell.

14/08/20 12:14:01 ERROR KryoSerializer: Failed to run spark.kryo.registrator
java.lang.ClassNotFoundException: MyKryoRegistrator

What's wrong?


Re: How to set KryoRegistrator class in spark-shell

2014-08-20 Thread Benyi Wang
I can do that in my application, but I really want to know how I can do it
in spark-shell because I usually prototype in spark-shell before I put the
code into an application.



On Wed, Aug 20, 2014 at 12:47 PM, Sameer Tilak ssti...@live.com wrote:

 Hi Wang,
 Have you tried doing this in your application?

conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
conf.set(spark.kryo.registrator, yourpackage.MyKryoRegistrator)

 You then don't need to specify it via commandline.


 --
 Date: Wed, 20 Aug 2014 12:25:14 -0700
 Subject: How to set KryoRegistrator class in spark-shell
 From: bewang.t...@gmail.com
 To: user@spark.apache.org


 I want to use opencsv's CSVParser to parse csv lines using a script like
 below in spark-shell:

 import au.com.bytecode.opencsv.CSVParser;
 import com.esotericsoftware.kryo.Kryo
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.hadoop.fs.{Path, FileSystem}

 class MyKryoRegistrator extends KryoRegistrator {
   override def registerClasses(kryo:Kryo) {
 kryo.register(classOf[CSVParser])
   }
 }

 val outDir=/tmp/dmc-out

 val fs = FileSystem.get(sc.hadoopConfiguration)
 fs.delete(new Path(outDir), true);

 val largeLines = sc.textFile(/tmp/dmc-03-08/*.gz)
 val parser = new CSVParser('|', '')
 largeLines.map(parser.parseLine(_).toList).saveAsTextFile(outDir,
 classOf[org.apache.hadoop.io.compress.GzipCodec])

 If I start spark-shell with spark.kryo.registrator like this

 SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer
 -Dspark.kryo.registrator=MyKryoRegistrator spark-shell

 it complains that MyKroRegistrator not found when I run :load my_script
 in spark-shell.

 14/08/20 12:14:01 ERROR KryoSerializer: Failed to run
 spark.kryo.registrator
 java.lang.ClassNotFoundException: MyKryoRegistrator

 What's wrong?