Complex transformation on a dataframe column

2015-10-15 Thread Hao Wang
Hi,

I have searched around but could not find a satisfying answer to this question: 
what is the best way to do a complex transformation on a dataframe column?

For example, I have a dataframe with the following schema and a function that 
has pretty complex logic to format addresses. I would like to use the function 
to format each address and store the output as an additional column in the 
dataframe. What is the best way to do it? Use Dataframe.map? Define a UDF? Some 
code example would be appreciated.

Input dataframe:
root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- Address: string (nullable = true)

Output dataframe:
root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- FormattedAddress: string (nullable = true)

The function for format addresses:
def formatAddress(address: String): String


Best regards,
Hao Wang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to convert dataframe to a nested StructType schema

2015-09-17 Thread Hao Wang
Thanks, Terry. This is exactly what I need :)

Hao

On Tue, Sep 15, 2015 at 8:47 PM, Terry Hole <hujie.ea...@gmail.com> wrote:

> Hao,
>
> For spark 1.4.1, you can try this:
> val rowrdd = df.rdd.map(r => Row(Row(r(3)), Row(r(0), r(1), r(2
> val newDF = sqlContext.createDataFrame(rowrdd, yourNewSchema)
>
> Thanks!
>
> - Terry
>
> On Wed, Sep 16, 2015 at 2:10 AM, Hao Wang <billhao.l...@gmail.com> wrote:
>
>> Hi,
>>
>> I created a dataframe with 4 string columns (city, state, country,
>> zipcode).
>> I then applied the following nested schema to it by creating a custom
>> StructType. When I run df.take(5), it gives the exception below as
>> expected.
>> The question is how I can convert the Rows in the dataframe to conform to
>> this nested schema? Thanks!
>>
>> root
>>  |-- ZipCode: struct (nullable = true)
>>  ||-- zip: string (nullable = true)
>>  |-- Address: struct (nullable = true)
>>  ||-- city: string (nullable = true)
>>  ||-- state: string (nullable = true)
>>  ||-- country: string (nullable = true)
>>
>> [info]   org.apache.spark.SparkException: Job aborted due to stage
>> failure:
>> Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
>> stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
>> java.lang.String)
>> [info] at
>>
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
>> [info] at
>>
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
>> [info] at
>>
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
>> [info] at
>> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>> [info] at
>> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


How to convert dataframe to a nested StructType schema

2015-09-15 Thread Hao Wang
Hi, 

I created a dataframe with 4 string columns (city, state, country, zipcode).
I then applied the following nested schema to it by creating a custom
StructType. When I run df.take(5), it gives the exception below as expected.
The question is how I can convert the Rows in the dataframe to conform to
this nested schema? Thanks! 

root 
 |-- ZipCode: struct (nullable = true) 
 ||-- zip: string (nullable = true) 
 |-- Address: struct (nullable = true) 
 ||-- city: string (nullable = true) 
 ||-- state: string (nullable = true) 
 ||-- country: string (nullable = true) 

[info]   org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
java.lang.String) 
[info] at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
 
[info] at
org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
 
[info] at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
 
[info] at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) 
[info] at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to convert dataframe to a nested StructType schema

2015-09-14 Thread Hao Wang
Hi, 

I created a dataframe with 4 string columns (city, state, country, zipcode).
I then applied the following nested schema to it by creating a custom
StructType. When I run df.take(5), it gives the exception below as expected.
The question is how I can convert the Rows in the dataframe to conform to
this nested schema? Thanks!

root
 |-- ZipCode: struct (nullable = true)
 ||-- zip: string (nullable = true)
 |-- Address: struct (nullable = true)
 ||-- city: string (nullable = true)
 ||-- state: string (nullable = true)
 ||-- country: string (nullable = true)

[info]   org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
java.lang.String)
[info]  at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
[info]  at
org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
[info]  at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
[info]  at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
[info]  at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24694.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to split log data into different files according to severity

2015-06-14 Thread Hao Wang
Thanks for the link. I’m still running 1.3.1 but will give it a try :)

Hao

 On Jun 13, 2015, at 9:38 AM, Will Briggs wrbri...@gmail.com wrote:
 
 Check out this recent post by Cheng Liam regarding dynamic partitioning in 
 Spark 1.4: https://www.mail-archive.com/user@spark.apache.org/msg30204.html 
 https://www.mail-archive.com/user@spark.apache.org/msg30204.html
 
 On June 13, 2015, at 5:41 AM, Hao Wang bill...@gmail.com wrote:
 
 
 Hi,
 
 I have a bunch of large log files on Hadoop. Each line contains a log and its 
 severity. Is there a way that I can use Spark to split the entire data set 
 into different files on Hadoop according the severity field? Thanks. Below is 
 an example of the input and output.
 
 Input:
 [ERROR] log1
 [INFO] log2
 [ERROR] log3
 [INFO] log4
 
 Output:
 error_file
 [ERROR] log1
 [ERROR] log3
 
 info_file
 [INFO] log2
 [INFO] log4
 
 
 Best,
 Hao Wang



Re: How to split log data into different files according to severity

2015-06-13 Thread Hao Wang
I am currently using filter inside a loop of all severity levels to do this, 
which I think is pretty inefficient. It has to read the entire data set once 
for each severity. I wonder if there is a more efficient way that takes just 
one pass of the data? Thanks.

Best,
Hao Wang

 On Jun 13, 2015, at 3:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Are you looking for something like filter? See a similar example here 
 https://spark.apache.org/examples.html 
 https://spark.apache.org/examples.html
 
 Thanks
 Best Regards
 
 On Sat, Jun 13, 2015 at 3:11 PM, Hao Wang bill...@gmail.com 
 mailto:bill...@gmail.com wrote:
 Hi,
 
 I have a bunch of large log files on Hadoop. Each line contains a log and its 
 severity. Is there a way that I can use Spark to split the entire data set 
 into different files on Hadoop according the severity field? Thanks. Below is 
 an example of the input and output.
 
 Input:
 [ERROR] log1
 [INFO] log2
 [ERROR] log3
 [INFO] log4
 
 Output:
 error_file
 [ERROR] log1
 [ERROR] log3
 
 info_file
 [INFO] log2
 [INFO] log4
 
 
 Best,
 Hao Wang
 



How to split log data into different files according to severity

2015-06-13 Thread Hao Wang
Hi,

I have a bunch of large log files on Hadoop. Each line contains a log and
its severity. Is there a way that I can use Spark to split the entire data
set into different files on Hadoop according the severity field? Thanks.
Below is an example of the input and output.

Input:
[ERROR] log1
[INFO] log2
[ERROR] log3
[INFO] log4

Output:
error_file
[ERROR] log1
[ERROR] log3

info_file
[INFO] log2
[INFO] log4


Best,
Hao Wang


Re: Kyro deserialisation error

2014-07-17 Thread Hao Wang
Hi, all

Yes, it's a name of Wikipedia article. I am running WikipediaPageRank
example of Spark Bagels.
I am wondering whether there is any relation to buffer size of Kyro.

The page rank can be successfully finished, sometimes not because this kind
of Kyro exception happens too many times, which beats the maxTaskFailures.

I find this *Kyro exception: unable to find class *in my successful case,
too.


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Thu, Jul 17, 2014 at 4:44 PM, Sean Owen so...@cloudera.com wrote:

 Not sure if this helps, but it does seem to be part of a name in a
 Wikipedia article, and Wikipedia is the data set. So something is
 reading this class name from the data.

 http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode

 On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das
 tathagata.das1...@gmail.com wrote:
  Seems like there is some sort of stream corruption, causing Kryo read to
  read a weird class name from the stream (the name arl Fridtjof Rode in
 the
  exception cannot be a class!).
  Not sure how to debug this.
 
  @Patrick: Any idea?



Re: Kyro deserialisation error

2014-07-16 Thread Hao Wang
Thanks for your reply. The SparkContext is configured as below:

 sparkConf.setAppName(WikipediaPageRank)
sparkConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
sparkConf.set(spark.kryo.registrator,  classOf[PRKryoRegistrator].getName)
val inputFile = args(0)
val threshold = args(1).toDouble
val numPartitions = args(2).toInt
val usePartitioner = args(3).toBoolean

sparkConf.setAppName(WikipediaPageRank)
sparkConf.set(spark.executor.memory, 60g)
sparkConf.set(spark.cores.max, 48)
sparkConf.set(spark.kryoserializer.buffer.mb, 24)
val sc = new SparkContext(sparkConf)

sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar)

And I use spark-submit to run the application:
./bin/spark-submit --master spark://sing12:7077
--total-executor-cores 40 --executor-memory 40g --class
org.apache.spark.examples.bagel.WikipediaPageRank
~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar
hdfs://192.168.1.12:9000/freebase-26G 1 200 True


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Are you using classes from external libraries that have not been added to
 the sparkContext, using sparkcontext.addJar()?

 TD


 On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote:

 I am running the WikipediaPageRank in Spark example and share the same
 problem with you:

 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6)
 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times;
 aborting job
 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at
 Bagel.scala:251
 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl:
 Cancelling stage 6
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330
 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find
 class: arl Fridtjof Rode

 com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
 com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
 com.twitter.chill.TraversableSerializer.read(Traversable.scala:44)
 com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115)

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

 org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
 org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)

 Anyone cloud help?

 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote:

 I tried to use Kryo as a serialiser isn spark streaming, did everything
 according to the guide posted on the spark website, i.e. added the
 following
 lines:

 conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer);
 conf.set(spark.kryo.registrator, MyKryoRegistrator);

 I also added the necessary classes to the MyKryoRegistrator.

 However I get the following strange error, can someone help me out where
 to
 look for a solution?

 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job
 streaming
 job 140177880 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception
 while deserializing and fetching task:
 com.esotericsoftware.kryo.KryoException: Unable to find class: J
 Serialization trace:
 id (org.apache.spark.storage.GetBlock)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59

Re: Kyro deserialisation error

2014-07-16 Thread Hao Wang
I am not sure. Not every task will fail at this Kyro exception. In most
time, the cluster could successfully finish the WikipediaPageRank.
How could I debug this exception?

Thanks

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Thu, Jul 17, 2014 at 2:58 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Is the class that is not found in the wikipediapagerank jar?

 TD


 On Wed, Jul 16, 2014 at 12:32 AM, Hao Wang wh.s...@gmail.com wrote:

 Thanks for your reply. The SparkContext is configured as below:


  sparkConf.setAppName(WikipediaPageRank)




 sparkConf.set(spark.serializer, 
 org.apache.spark.serializer.KryoSerializer)




 sparkConf.set(spark.kryo.registrator,  
 classOf[PRKryoRegistrator].getName)




 val inputFile = args(0)




 val threshold = args(1).toDouble




 val numPartitions = args(2).toInt




 val usePartitioner = args(3).toBoolean





 sparkConf.setAppName(WikipediaPageRank)




 sparkConf.set(spark.executor.memory, 60g)




 sparkConf.set(spark.cores.max, 48)




 sparkConf.set(spark.kryoserializer.buffer.mb, 24)




 val sc = new SparkContext(sparkConf)




 
 sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar)





 And I use spark-submit to run the application:




 ./bin/spark-submit --master spark://sing12:7077  --total-executor-cores 40 
 --executor-memory 40g --class 
 org.apache.spark.examples.bagel.WikipediaPageRank 
 ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar 
 hdfs://192.168.1.12:9000/freebase-26G 1 200 True





 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using classes from external libraries that have not been added
 to the sparkContext, using sparkcontext.addJar()?

 TD


 On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote:

 I am running the WikipediaPageRank in Spark example and share the same
 problem with you:

 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6)
 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times;
 aborting job
 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at
 Bagel.scala:251
 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl:
 Cancelling stage 6
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330
 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find
 class: arl Fridtjof Rode

 com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
 com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)

 com.twitter.chill.TraversableSerializer.read(Traversable.scala:44)

 com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115)

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)

 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

 org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
 org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)

 Anyone cloud help?

 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote:

 I tried to use Kryo as a serialiser isn spark streaming, did everything
 according to the guide posted on the spark website, i.e. added the
 following
 lines:

 conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer);
 conf.set(spark.kryo.registrator, MyKryoRegistrator);

 I also added the necessary classes to the MyKryoRegistrator.

 However I get the following strange error, can someone help me out
 where to
 look for a solution?

 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job
 streaming
 job 140177880 ms.0
 org.apache.spark.SparkException: Job aborted due

Re: Kyro deserialisation error

2014-07-15 Thread Hao Wang
I am running the WikipediaPageRank in Spark example and share the same
problem with you:

4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6)
14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times;
aborting job
14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at
Bagel.scala:251
Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl:
Cancelling stage 6
org.apache.spark.SparkException: Job aborted due to stage failure: Task
6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330
on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find
class: arl Fridtjof Rode

com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
com.twitter.chill.TraversableSerializer.read(Traversable.scala:44)
com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115)

org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)

Anyone cloud help?

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote:

 I tried to use Kryo as a serialiser isn spark streaming, did everything
 according to the guide posted on the spark website, i.e. added the
 following
 lines:

 conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer);
 conf.set(spark.kryo.registrator, MyKryoRegistrator);

 I also added the necessary classes to the MyKryoRegistrator.

 However I get the following strange error, can someone help me out where to
 look for a solution?

 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job streaming
 job 140177880 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception
 while deserializing and fetching task:
 com.esotericsoftware.kryo.KryoException: Unable to find class: J
 Serialization trace:
 id (org.apache.spark.storage.GetBlock)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Kyro-deserialisation-error-tp6798.html
 Sent from the Apache Spark User List 

Re: long GC pause during file.cache()

2014-06-15 Thread Hao Wang
Hi, Wei

You may try to set JVM opts in *spark-env.sh* as follow to prevent or
mitigate GC pause:

export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
-Xmx2g -XX:MaxPermSize=256m

There are more options you could add, please just Google :)


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote:

 Hi,

   I have a single node (192G RAM) stand-alone spark, with memory
 configuration like this in spark-env.sh

 SPARK_WORKER_MEMORY=180g
 SPARK_MEM=180g


  In spark-shell I have a program like this:

 val file = sc.textFile(/localpath) //file size is 40G
 file.cache()


 val output = file.map(line = extract something from line)

 output.saveAsTextFile (...)


 When I run this program again and again, or keep trying file.unpersist()
 -- file.cache() -- output.saveAsTextFile(), the run time varies a lot,
 from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
 from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
 Of course when run-time is normal, say ~1 min, no significant GC is
 observed. The behavior seems somewhat random.

 Is there any JVM tuning I should do to prevent this long GC pause from
 happening?



 I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
 like this:

 root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
 ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
 org.apache.spark.deploy.SparkSubmit spark-shell --class
 org.apache.spark.repl.Main

 Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 *http://researcher.ibm.com/person/us-wtan*
 http://researcher.ibm.com/person/us-wtan


Akka listens to hostname while user may spark-submit with master in IP url

2014-06-15 Thread Hao Wang
Hi, All

In Spark the spark.driver.host is driver hostname in default, thus, akka
actor system will listen to a URL like akka.tcp://hostname:port. However,
when a user tries to use spark-submit to run application, the user may set
--master spark://192.168.1.12:7077.

Then, the *AppClient* in *SparkDeploySchedulerBackend* cannot successfully
register to the Master, and the console prints:

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
memory

I think we need to improve this by making akka recognises both hostname and
the corresponding IP. Or at least add lines in Spark document to limit user
from using IP. Any comments?

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


BUG? Why does MASTER have to be set to spark://hostname:port?

2014-06-13 Thread Hao Wang
Hi, all

When I try to run Spark PageRank using:

./bin/spark-submit \
--master spark://192.168.1.12:7077 \
--class org.apache.spark.examples.bagel.WikipediaPageRank \
~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar \
hdfs://192.168.1.12:9000/freebase-13G 0.05 100 True

*I don't specify the Spark Master by SparkContext.setMaster() in PageRank
code.*

Unfortunately, it hanged on here:

14/06/13 22:09:43 INFO DAGScheduler: Submitting 104 missing tasks from
Stage 0 (MappedRDD[1] at textFile at WikipediaPageRank.scala:59)
14/06/13 22:09:43 INFO TaskSchedulerImpl: Adding task set 0.0 with 104 tasks
14/06/13 22:09:58 WARN TaskSchedulerImpl: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory

But after I change --master to hostname:7077, it works normally.

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


Re: how to set spark.executor.memory and heap size

2014-06-13 Thread Hao Wang
Hi, Laurent

You could set Spark.executor.memory and heap size by following methods:

1. in you conf/spark-env.sh:
*export SPARK_WORKER_MEMORY=38g*
*export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit
-XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m*

2. you could also add modification for executor memory and java opts
in *spark-submit
*parameters.

Check the Spark *configure *and *tuning *docs, you could find full answers
there.


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Thu, Jun 12, 2014 at 6:29 PM, Laurent T laurent.thou...@ldmobile.net
wrote:

 Hi,

 Can you give us a little more insight on how you used that file to solve
 your problem ?
 We're having the same OOM as you were and haven't been able to solve it
 yet.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p7469.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Spark 1.0.0 Standalone AppClient cannot connect Master

2014-06-12 Thread Hao Wang
Hi, all

Why does the Spark 1.0.0 official doc remove how to build Spark with
corresponding Hadoop version?

It means that if I don't need to specify the Hadoop version with I build my
Spark 1.0.0 with `sbt/sbt assembly`?


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


Re: Spark 1.0.0 Standalone AppClient cannot connect Master

2014-06-12 Thread Hao Wang
Hi, Andrew

Got it, Thanks!

Hao

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Fri, Jun 13, 2014 at 12:42 AM, Andrew Or and...@databricks.com wrote:

 Hi Wang Hao,

 This is not removed. We moved it here:
 http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html
 If you're building with SBT, and you don't specify the
 SPARK_HADOOP_VERSION, then it defaults to 1.0.4.

 Andrew


 2014-06-12 6:24 GMT-07:00 Hao Wang wh.s...@gmail.com:

 Hi, all

 Why does the Spark 1.0.0 official doc remove how to build Spark with
 corresponding Hadoop version?

 It means that if I don't need to specify the Hadoop version with I build
 my Spark 1.0.0 with `sbt/sbt assembly`?


 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com





Re: com.google.protobuf out of memory

2014-05-25 Thread Hao Wang
Hi, Zuhair

According to my experience, you could try following steps to avoid Spark
OOM:

   1. Increase JVM memory by adding export SPARK_JAVA_OPTS=-Xmx2g
   2. Use .persist(storage.StorageLevel.MEMORY_AND_DISK) instead of .cache()
   3. Have you set spark.executor.memory value? It's 512m by default.
   4. Add more memory to the workers.

I haven't tried to modify the spark.shuffle.memoryFraction value. But it is
said it's a threshold for contents being spilled to disk or not. I think
you may decrease this value to mitigate pressure on memory.

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Sun, May 25, 2014 at 4:15 PM, Zuhair Khayyat zuhair.khay...@gmail.comwrote:

 Dear all,

 I am getting a OutOfMemoryError in class ByteString.java from package
 com.google.protobuf when processing very large data using spark 0.9. Does
 increasing spark.shuffle.memoryFraction helps or I should add more memory
 to my workers? Below the error I get during execution.

 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to spark@cloud21:47985

 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to spark@cloud5:46977

 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to spark@cloud14:51948

 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to spark@cloud12:45368

 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to spark@cloud9:50926

 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to spark@cloud10:50690

 14/05/25 07:26:12 ERROR ActorSystemImpl: Uncaught fatal error from thread
 [spark-akka.actor.default-dispatcher-5] shutting down ActorSystem [spark]

 java.lang.OutOfMemoryError: Java heap space

 at
 com.google.protobuf_spark.ByteString.copyFrom(ByteString.java:90)

 at
 com.google.protobuf_spark.ByteString.copyFrom(ByteString.java:99)

 at
 akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)

 at
 akka.remote.EndpointWriter$$anonfun$akka$remote$EndpointWriter$$serializeMessage$1.apply(Endpoint.scala:672)

 at
 akka.remote.EndpointWriter$$anonfun$akka$remote$EndpointWriter$$serializeMessage$1.apply(Endpoint.scala:672)

 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

 at
 akka.remote.EndpointWriter.akka$remote$EndpointWriter$$serializeMessage(Endpoint.scala:671)

 at
 akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:559)

 at
 akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544)

 at
 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)

 at akka.actor.FSM$class.processEvent(FSM.scala:595)

 at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443)

 at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589)

 at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583)

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

 at akka.actor.ActorCell.invoke(ActorCell.scala:456)

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

 at akka.dispatch.Mailbox.run(Mailbox.scala:219)

 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:385)

 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Thank you,



For performance, Spark prefers OracleJDK or OpenJDK?

2014-05-19 Thread Hao Wang
Hi,

Oracle JDK and OpenJDK, which one is better or preferred for Spark?


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


java.lang.NoClassDefFoundError: org/apache/spark/deploy/worker/Worker

2014-05-18 Thread Hao Wang
Hi, all

*Spark version: bae07e3 [behind 1] fix different versions of commons-lang
dependency and apache/spark#746 addendum*

I have six worker nodes and four of them have this NoClassDefFoundError when
I use thestart-slaves.sh on my driver node. However, running ./bin/spark-class
org.apache.spark.deploy.worker.Worker spark://MASTER_IP:PORT on the worker
nodes works well.

I compile the /spark directory on driver node and distribute to all the
worker nodes. Paths on different nodes are identical.

Here is the logs from one of four driver nodes.

Spark Command: java -cp
::/home/wanghao/spark/conf:/home/wanghao/spark/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
org.apache.spark.deploy.worker.Worker spark://192.168.1.12:7077
--webui-port 8081


Exception in thread main java.lang.NoClassDefFoundError:
org/apache/spark/deploy/worker/Worker
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.deploy.worker.Worker
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
Could not find the main class: org.apache.spark.deploy.worker.Worker.
Program will exit.

Here is spark-env.sh

export SPARK_WORKER_MEMORY=1g
export SPARK_MASTER_IP=192.168.1.12
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=2

hosts file:

127.0.0.1   localhost
192.168.1.12sing12

# The following lines are desirable for IPv6 capable hosts
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

192.168.1.11 sing11
192.168.1.59 sing59

###
# failed machines
###

192.168.1.122 host122
192.168.1.123 host123
192.168.1.124 host124
192.168.1.125 host125


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com