Re: Is there a list of missing optimizations for typed functions?

2017-02-27 Thread lihu
Hi, you can refer to https://issues.apache.org/jira/browse/SPARK-14083 for
more detail.

For performance issue,it is better to using the DataFrame than DataSet API.

On Sat, Feb 25, 2017 at 2:45 AM, Jacek Laskowski  wrote:

> Hi Justin,
>
> I have never seen such a list. I think the area is in heavy development
> esp. optimizations for typed operations.
>
> There's a JIRA to somehow find out more on the behavior of Scala code
> (non-Column-based one from your list) but I've seen no activity in this
> area. That's why for now Column-based untyped queries could be faster due
> to more optimizations applied. Same about UDFs.
>
> Jacek
>
> On 23 Feb 2017 7:52 a.m., "Justin Pihony"  wrote:
>
>> I was curious if there was introspection of certain typed functions and
>> ran
>> the following two queries:
>>
>> ds.where($"col" > 1).explain
>> ds.filter(_.col > 1).explain
>>
>> And found that the typed function does NOT result in a PushedFilter. I
>> imagine this is due to a limited view of the function, so I have two
>> questions really:
>>
>> 1.) Is there a list of the methods that lose some of the optimizations
>> that
>> you get from non-functional methods? Is it any method that accepts a
>> generic
>> function?
>> 2.) Is there any work to attempt reflection and gain some of these
>> optimizations back? I couldn't find anything in JIRA.
>>
>> Thanks,
>> Justin Pihony
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizatio
>> ns-for-typed-functions-tp28418.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Graphx

2016-03-11 Thread lihu
Hi, John:
   I am very intersting in your experiment, How can you get that RDD
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
wrote:

> Andrew,
>
>
>
> We conducted some tests for using Graphx to solve the connected-components
> problem and were disappointed.  On 8 nodes of 16GB each, we could not get
> above 100M edges.  On 8 nodes of 60GB each, we could not process 1bn
> edges.  RDD serialization would take excessive time and then we would get
> failures.  By contrast, we have a C++ algorithm that solves 1bn edges using
> memory+disk on a single 16GB node in about an hour.  I think that a very
> large cluster will do better, but we did not explore that.
>
>
>
> *John Lilley*
>
> Chief Architect, RedPoint Global Inc.
>
> T: +1 303 541 1516  *| *M: +1 720 938 5761 *|* F: +1 781-705-2077
>
> Skype: jlilley.redpoint *|* john.lil...@redpoint.net *|* www.redpoint.net
>
>
>
> *From:* Andrew A [mailto:andrew.a...@gmail.com]
> *Sent:* Thursday, March 10, 2016 2:44 PM
> *To:* u...@spark.incubator.apache.org
> *Subject:* Graphx
>
>
>
> Hi, is there anyone who use graphx in production? What maximum size of
> graphs did you process by spark and what cluster are you use for it?
>
> i tried calculate pagerank for 1 Gb edges LJ - dataset for
> LiveJournalPageRank from spark examples and i faced with large volume
> shuffles produced by spark which fail my spark job.
>
> Thank you,
>
> Andrew
>


how to using local repository in spark[dev]

2015-11-27 Thread lihu
Hi, All:

 I modify the spark code and try to use some extra jars in Spark, the
extra jars is published in my local maven repository using* mvn install*.
 However the sbt can not find this jars file, even I can find this jar
fils under* /home/myname/.m2/resposiroty*.
I can guarantee that the local m2 repository is added in the resolvers,
because I get the following resolvers using *show resolvers* command.


*List(central: https://repo1.maven.org/maven2
, apache-repo:
https://repository.apache.org/content/repositories/releases
, jboss-repo:
https://repository.jboss.org/nexus/content/repositories/releases
,
mqtt-repo: https://repo.eclipse.org/content/repositories/paho-releases
,
cloudera-repo: https://repository.cloudera.com/artifactory/cloudera-repos
,
spark-hive-staging:
https://oss.sonatype.org/content/repositories/orgspark-project-1113
,
mapr-repo: http://repository.mapr.com/maven/
, spring-releases:
https://repo.spring.io/libs-release ,
twttr-repo: http://maven.twttr.com ,
apache.snapshots: http://repository.apache.org/snapshots
, cache:Maven2 Local:
/home/myname/.m2/repository)*


Does anyone know how to deal with this. In fact, some days ago this can
work, but after update my custom jar file and install again recently, it
can not work now.

Environment: spark1.5  sbt 0.13.7/0.13.9


Re: high GC in the Kmeans algorithm

2015-02-17 Thread lihu
Thanks for your answer. Yes, I cached the data, I can observed from the
WebUI that all the data is cached in the memory.

What I worry is that the dimension,  not the total size.

Sean Owen ever answered me that the Broadcast support the maximum array
size is 2GB, so 10^7 is a little huge?

On Wed, Feb 18, 2015 at 5:43 AM, Xiangrui Meng men...@gmail.com wrote:

 Did you cache the data? Was it fully cached? The k-means
 implementation doesn't create many temporary objects. I guess you need
 more RAM to avoid GC triggered frequently. Please monitor the memory
 usage using YourKit or VisualVM. -Xiangrui

 On Wed, Feb 11, 2015 at 1:35 AM, lihu lihu...@gmail.com wrote:
  I just want to make the best use of CPU,  and test the performance of
 spark
  if there is a lot of task in a single node.
 
  On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote:
 
  Good, worth double-checking that's what you got. That's barely 1GB per
  task though. Why run 48 if you have 24 cores?
 
  On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote:
   I give 50GB to the executor,  so it seem that  there is no reason the
   memory
   is not enough.
  
   On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com
 wrote:
  
   Meaning, you have 128GB per machine but how much memory are you
 giving
   the executors?
  
   On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote:
What do you mean?  Yes,I an see there  is some data put in the
 memory
from
the web ui.
   
On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com
wrote:
   
Are you actually using that memory for executors?
   
On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote:
 Hi,
 I  run the kmeans(MLlib) in a cluster with 12 workers.
 Every
 work
 own a
 128G RAM, 24Core. I run 48 task in one machine. the total data
 is
 just
 40GB.

When the dimension of the data set is about 10^7, for every
 task
 the
 duration is about 30s, but the cost for GC is about 20s.

When I reduce the dimension to 10^4, then the gc is small.

 So why gc is so high when the dimension is larger? or this
 is
 the
 reason
 caused by MLlib?




   
   
   
   
--
Best Wishes!
   
Li Hu(李浒) | Graduate Student
Institute for Interdisciplinary Information Sciences(IIIS)
Tsinghua University, China
   
Email: lihu...@gmail.com
Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
   
   
  
  
  
  
   --
   Best Wishes!
  
   Li Hu(李浒) | Graduate Student
   Institute for Interdisciplinary Information Sciences(IIIS)
   Tsinghua University, China
  
   Email: lihu...@gmail.com
   Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
  
  
 
 
 
 
  --
  Best Wishes!
 
  Li Hu(李浒) | Graduate Student
  Institute for Interdisciplinary Information Sciences(IIIS)
  Tsinghua University, China
 
  Email: lihu...@gmail.com
  Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 
 



Task not serializable problem in the multi-thread SQL query

2015-02-12 Thread lihu
I try to use the multi-thread to use the Spark SQL query.
some sample code just like this:

val sqlContext = new SqlContext(sc)
val rdd_query = sc.parallelize(data,   part)
rdd_query.registerTempTable(MyTable)
sqlContext.cacheTable(MyTable)

val serverPool = Executors.newFixedThreadPool(3)
val loopCnt = 10

for(i - 1 to loopCnt ){
serverPool.execute(new Runnable(){
override def run(){
if( some condition){
sqlContext.sql(SELECT * from
...).collect().foreach(println)
}
else{
//some other query
}

}
})
}

this will throw a Task serializable Exception, if I do not use the
multi-thread, it works well.
Since there is no object is not serializable? so what is the problem?


java.lang.Error: org.apache.spark.SparkException: Task not serializable
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

Caused by: java.lang.NullPointerException
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

-- 
*Best Wishes!*


Re: Task not serializable problem in the multi-thread SQL query

2015-02-12 Thread lihu
Thanks very much, you're right.

I called the sc.stop() before the execute pool shutdown.

On Fri, Feb 13, 2015 at 7:04 AM, Michael Armbrust mich...@databricks.com
wrote:

 It looks to me like perhaps your SparkContext has shut down due to too
 many failures.  I'd look in the logs of your executors for more information.

 On Thu, Feb 12, 2015 at 2:34 AM, lihu lihu...@gmail.com wrote:

 I try to use the multi-thread to use the Spark SQL query.
 some sample code just like this:

 val sqlContext = new SqlContext(sc)
 val rdd_query = sc.parallelize(data,   part)
 rdd_query.registerTempTable(MyTable)
 sqlContext.cacheTable(MyTable)

 val serverPool = Executors.newFixedThreadPool(3)
 val loopCnt = 10

 for(i - 1 to loopCnt ){
 serverPool.execute(new Runnable(){
 override def run(){
 if( some condition){
 sqlContext.sql(SELECT * from
 ...).collect().foreach(println)
 }
 else{
 //some other query
 }

 }
 })
 }

 this will throw a Task serializable Exception, if I do not use the
 multi-thread, it works well.
 Since there is no object is not serializable? so what is the problem?


 java.lang.Error: org.apache.spark.SparkException: Task not serializable
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 Caused by: org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 Caused by: java.lang.NullPointerException
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 --
 *Best Wishes!*






-- 
*Best Wishes!*


Re: high GC in the Kmeans algorithm

2015-02-11 Thread lihu
I just want to make the best use of CPU,  and test the performance of spark
if there is a lot of task in a single node.

On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote:

 Good, worth double-checking that's what you got. That's barely 1GB per
 task though. Why run 48 if you have 24 cores?

 On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote:
  I give 50GB to the executor,  so it seem that  there is no reason the
 memory
  is not enough.
 
  On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com wrote:
 
  Meaning, you have 128GB per machine but how much memory are you giving
  the executors?
 
  On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote:
   What do you mean?  Yes,I an see there  is some data put in the memory
   from
   the web ui.
  
   On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com
 wrote:
  
   Are you actually using that memory for executors?
  
   On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote:
Hi,
I  run the kmeans(MLlib) in a cluster with 12 workers.  Every
work
own a
128G RAM, 24Core. I run 48 task in one machine. the total data is
just
40GB.
   
   When the dimension of the data set is about 10^7, for every task
the
duration is about 30s, but the cost for GC is about 20s.
   
   When I reduce the dimension to 10^4, then the gc is small.
   
So why gc is so high when the dimension is larger? or this is
 the
reason
caused by MLlib?
   
   
   
   
  
  
  
  
   --
   Best Wishes!
  
   Li Hu(李浒) | Graduate Student
   Institute for Interdisciplinary Information Sciences(IIIS)
   Tsinghua University, China
  
   Email: lihu...@gmail.com
   Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
  
  
 
 
 
 
  --
  Best Wishes!
 
  Li Hu(李浒) | Graduate Student
  Institute for Interdisciplinary Information Sciences(IIIS)
  Tsinghua University, China
 
  Email: lihu...@gmail.com
  Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 
 




-- 
*Best Wishes!*

*Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/)*
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


high GC in the Kmeans algorithm

2015-02-11 Thread lihu
Hi,
I  run the kmeans(MLlib) in a cluster with 12 workers.  Every work own
a 128G RAM, 24Core. I run 48 task in one machine. the total data is just
40GB.

   When the dimension of the data set is about 10^7, for every task the
duration is about 30s, but the cost for GC is about 20s.

   When I reduce the dimension to 10^4, then the gc is small.

So why gc is so high when the dimension is larger? or this is the
reason caused by MLlib?


Re: use netty shuffle for network cause high gc time

2015-01-14 Thread lihu
I used the spark1.1

On Wed, Jan 14, 2015 at 2:24 PM, Aaron Davidson ilike...@gmail.com wrote:

 What version are you running? I think spark.shuffle.use.netty was a
 valid option only in Spark 1.1, where the Netty stuff was strictly
 experimental. Spark 1.2 contains an officially supported and much more
 thoroughly tested version under the property 
 spark.shuffle.blockTransferService,
 which is set to netty by default.

 On Tue, Jan 13, 2015 at 9:26 PM, lihu lihu...@gmail.com wrote:

 Hi,
  I just test groupByKey method on a 100GB data, the cluster is 20
 machine, each with 125GB RAM.

 At first I set  conf.set(spark.shuffle.use.netty, false) and run
 the experiment, and then I set conf.set(spark.shuffle.use.netty, true)
 again to re-run the experiment, but at the latter case, the GC time is much
 higher。


  I thought the latter one should be better, but it is not. So when should
 we use netty for network shuffle fetching?






-- 
*Best Wishes!*

*Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/)*
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


use netty shuffle for network cause high gc time

2015-01-13 Thread lihu
Hi,
 I just test groupByKey method on a 100GB data, the cluster is 20
machine, each with 125GB RAM.

At first I set  conf.set(spark.shuffle.use.netty, false) and run
the experiment, and then I set conf.set(spark.shuffle.use.netty, true)
again to re-run the experiment, but at the latter case, the GC time is much
higher。


 I thought the latter one should be better, but it is not. So when should
we use netty for network shuffle fetching?


Re: Save RDD with partition information

2015-01-13 Thread lihu
there is no way to avoid shuffle if you use combine by key, no matter if
your data is cached in memory, because the shuffle write must write the
data into disk. And It seem that spark can not guarantee the similar
key(K1) goes to the Container_X.

you can use the tmpfs for your shuffle dir, this can improve your shuffle
write speed.

If the number of worker nodes is enough, then hundreds of GB is not quite
big to deal with.


On Wed, Jan 14, 2015 at 5:30 AM, Puneet Kapoor puneet.cse.i...@gmail.com
wrote:

 Hi,

 I have a usecase where in I have hourly spark job which creates hourly
 RDDs, which are partitioned by keys.

 At the end of the day I need to access all of these RDDs and combine the
 Key/Value pairs over the day.

 If there is a key K1 in RDD0 (1st hour of day), RDD1 ... RDD23(last hour
 of the day); we need to combine all the values of this K1 using some logic.

 What I want to do is to avoid the shuffling at the end of the day since
 the data in huge ~ hundreds of GB.

 Questions
 ---
 1.) Is there a way that i can persist hourly RDDs with partition
 information and then while reading back the RDDs the partition information
 is restored.
 2.) Can i ensure that partitioning is similar for different hours. Like if
 K1 goes to container_X, it would go to the same container in the next hour
 and so on.

 Regards
 Puneet




Re: Save RDD with partition information

2015-01-13 Thread lihu
By the way, I am not sure enough wether the shuffle key can go into the
similar container.


Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-12 Thread lihu
How about your scene? do you need use lots of Broadcast? If not, It will be
better to focus more on other thing.

At this time, there is not more better method than TorrentBroadcast. Though
one-by-one, but after one node get the data, it can act as the data source
immediately.


Re: do not assemble the spark example jar

2014-12-09 Thread lihu
Can this assembly get faster if we do not need the Spark SQL or some other
component in spark ?  such as we only need the core of spark.

On Wed, Nov 26, 2014 at 3:37 PM, lihu lihu...@gmail.com wrote:

 Matei, sorry for my last typo error. And the tip can improve about 30s in
 my computer.

 On Wed, Nov 26, 2014 at 3:34 PM, lihu lihu...@gmail.com wrote:

 Mater, thank you very much!
 After take your advice, the time for assembly from about 20min down to
 6min in my computer. that's a very big improvement.

 On Wed, Nov 26, 2014 at 12:32 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 BTW as another tip, it helps to keep the SBT console open as you make
 source changes (by just running sbt/sbt with no args). It's a lot faster
 the second time it builds something.

 Matei

 On Nov 25, 2014, at 8:31 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 You can do sbt/sbt assembly/assembly to assemble only the main package.

 Matei

 On Nov 25, 2014, at 7:50 PM, lihu lihu...@gmail.com wrote:

 Hi,
 The spark assembly is time costly. If  I only need
 the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need
 the spark-examples-1.1.0-hadoop2.3.0.jar.  How to configure the spark to
 avoid assemble the example jar. I know *export SPARK_PREPEND_CLASSES=*
 *true* method can reduce the assembly, but I do not
 develop locally. Any advice?

 --
 *Best Wishes!*







 --
 *Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

 *Institute for Interdisciplinary Information Sciences(IIIS
 http://iiis.tsinghua.edu.cn/)*
 *Tsinghua University, China*

 *Email: lihu...@gmail.com lihu...@gmail.com*
 *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 http://iiis.tsinghua.edu.cn/zh/lihu/*





 --
 *Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

 *Institute for Interdisciplinary Information Sciences(IIIS
 http://iiis.tsinghua.edu.cn/)*
 *Tsinghua University, China*

 *Email: lihu...@gmail.com lihu...@gmail.com*
 *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 http://iiis.tsinghua.edu.cn/zh/lihu/*





-- 
*Best Wishes!*

*Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/)*
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


Re: How to convert RDD to JSON?

2014-12-08 Thread lihu
RDD is just a wrap of the scala collection, Maybe you can use the
.collect() method to get the scala collection type, you can then transfer
to a JSON object using the Scala method.


Re: Viewing web UI after fact

2014-12-02 Thread lihu
 to launch a history server to serve the logs.

 Matei

 On August 13, 2014 at 2:03:08 AM, grzegorz-bialek (
 grzegorz.bia...@codilime.com) wrote:

 Hi,
 I wanted to access Spark web UI after application stops. I set
 spark.eventLog.enabled to true and logs are availaible
 in JSON format in /tmp/spark-event but web UI isn't available
 under address
 http://driver-node:4040
 I'm running Spark in standalone mode.

 What should I do to access web UI after application ends?

 Thanks,
 Grzegorz



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Viewing-web-UI-after-fact-tp12023.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -

 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org












-- 
*Best Wishes!*

*Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/)*
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


do not assemble the spark example jar

2014-11-25 Thread lihu
Hi,
The spark assembly is time costly. If  I only need
the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need
the spark-examples-1.1.0-hadoop2.3.0.jar.  How to configure the spark to
avoid assemble the example jar. I know *export
SPARK_PREPEND_CLASSES=**true* method
can reduce the assembly, but I do not
develop locally. Any advice?

-- 
*Best Wishes!*


Re: do not assemble the spark example jar

2014-11-25 Thread lihu
Mater, thank you very much!
After take your advice, the time for assembly from about 20min down to 6min
in my computer. that's a very big improvement.

On Wed, Nov 26, 2014 at 12:32 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 BTW as another tip, it helps to keep the SBT console open as you make
 source changes (by just running sbt/sbt with no args). It's a lot faster
 the second time it builds something.

 Matei

 On Nov 25, 2014, at 8:31 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 You can do sbt/sbt assembly/assembly to assemble only the main package.

 Matei

 On Nov 25, 2014, at 7:50 PM, lihu lihu...@gmail.com wrote:

 Hi,
 The spark assembly is time costly. If  I only need
 the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need
 the spark-examples-1.1.0-hadoop2.3.0.jar.  How to configure the spark to
 avoid assemble the example jar. I know *export SPARK_PREPEND_CLASSES=*
 *true* method can reduce the assembly, but I do not
 develop locally. Any advice?

 --
 *Best Wishes!*







-- 
*Best Wishes!*

*Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/)*
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


Re: do not assemble the spark example jar

2014-11-25 Thread lihu
Matei, sorry for my last typo error. And the tip can improve about 30s in
my computer.

On Wed, Nov 26, 2014 at 3:34 PM, lihu lihu...@gmail.com wrote:

 Mater, thank you very much!
 After take your advice, the time for assembly from about 20min down to
 6min in my computer. that's a very big improvement.

 On Wed, Nov 26, 2014 at 12:32 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 BTW as another tip, it helps to keep the SBT console open as you make
 source changes (by just running sbt/sbt with no args). It's a lot faster
 the second time it builds something.

 Matei

 On Nov 25, 2014, at 8:31 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 You can do sbt/sbt assembly/assembly to assemble only the main package.

 Matei

 On Nov 25, 2014, at 7:50 PM, lihu lihu...@gmail.com wrote:

 Hi,
 The spark assembly is time costly. If  I only need
 the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need
 the spark-examples-1.1.0-hadoop2.3.0.jar.  How to configure the spark to
 avoid assemble the example jar. I know *export SPARK_PREPEND_CLASSES=*
 *true* method can reduce the assembly, but I do not
 develop locally. Any advice?

 --
 *Best Wishes!*







 --
 *Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

 *Institute for Interdisciplinary Information Sciences(IIIS
 http://iiis.tsinghua.edu.cn/)*
 *Tsinghua University, China*

 *Email: lihu...@gmail.com lihu...@gmail.com*
 *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 http://iiis.tsinghua.edu.cn/zh/lihu/*





-- 
*Best Wishes!*

*Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/)*
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-24 Thread lihu
​Which code do you used, do you caused by your own code or something in
spark itself?


On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote:

 I have the same problem


 On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote:

 Hi,
 Everyone.  I have a piece of following code. When I run it,
 it occurred the error just like below, it seem that the SparkContext is not
 serializable, but i do not try to use the SparkContext except the broadcast.
 [In fact, this code is in the MLLib, I just try to broadcast the
  centerArrays ]

 it can success in the redeceBykey operation, but failed at the
 collect operation, this confused me.


 INFO DAGScheduler: Failed to run collect at KMeans.scala:235
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




 private def initKMeansParallel(data: RDD[Array[Double]]):
 Array[ClusterCenters] = {

 @transient val sc = data.sparkContext   // I try to add the 
 transient
 annotation here, but it doesn't work

 // Initialize each run's center to a random point
 val seed = new XORShiftRandom().nextInt()
 val sample = data.takeSample(true, runs, seed).toSeq
 val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

 // On each step, sample 2 * k points on average for each run with
 probability proportional
 // to their squared distance from that run's current centers
 for (step - 0 until initializationSteps) {
   val centerArrays = sc.broadcast(centers.map(_.toArray))
   val sumCosts = data.flatMap { point =
 for (r - 0 until runs) yield (r,
 KMeans.pointCost(centerArrays.value(r), point))
   }.reduceByKey(_ + _).collectAsMap()
   //can pass at this point
   val chosen = data.mapPartitionsWithIndex { (index, points) =
 val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
 for {
   p - points
   r - 0 until runs
   if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r),
 p) * 2 * k / sumCosts(r)
 } yield (r, p)
   }.collect()
   // failed at this
 point.
   for ((r, p) - chosen) {
 centers(r) += p
   }
 }








-- 
*Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/) *
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Tel  : +86 15120081920*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-19 Thread lihu
Hi,
Everyone.  I have a piece of following code. When I run it,
it occurred the error just like below, it seem that the SparkContext is not
serializable, but i do not try to use the SparkContext except the broadcast.
[In fact, this code is in the MLLib, I just try to broadcast the
 centerArrays ]

it can success in the redeceBykey operation, but failed at the collect
operation, this confused me.


INFO DAGScheduler: Failed to run collect at KMeans.scala:235
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException:
org.apache.spark.SparkContext
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: org.apache.spark.SparkContext
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




private def initKMeansParallel(data: RDD[Array[Double]]):
Array[ClusterCenters] = {

@transient val sc = data.sparkContext   // I try to add
the transient
annotation here, but it doesn't work

// Initialize each run's center to a random point
val seed = new XORShiftRandom().nextInt()
val sample = data.takeSample(true, runs, seed).toSeq
val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

// On each step, sample 2 * k points on average for each run with
probability proportional
// to their squared distance from that run's current centers
for (step - 0 until initializationSteps) {
  val centerArrays = sc.broadcast(centers.map(_.toArray))
  val sumCosts = data.flatMap { point =
for (r - 0 until runs) yield (r,
KMeans.pointCost(centerArrays.value(r), point))
  }.reduceByKey(_ + _).collectAsMap()
//can pass at this point
  val chosen = data.mapPartitionsWithIndex { (index, points) =
val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
for {
  p - points
  r - 0 until runs
  if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r), p)
* 2 * k / sumCosts(r)
} yield (r, p)
  }.collect()
// failed at this
point.
  for ((r, p) - chosen) {
centers(r) += p
  }
}


which function can generate a ShuffleMapTask

2014-06-23 Thread lihu
I see that the task will either be a ShuffleMapTask or be a ResultTask, I
wonder which function will generate a ShuffleMapTask, which will generate a
ResultTask?


spark-env.sh do not take effect.

2014-05-12 Thread lihu
Hi,
I set a small cluster with 3 machines, every machine is 64GB RAM, 11
Core. and I used the spark0.9.

   I have set spark-env.sh as following:

   *SPARK_MASTER_IP=192.168.35.2*
*   SPARK_MASTER_PORT=7077*
*   SPARK_MASTER_WEBUI_PORT=12306*
*   SPARK_WORKER_CORES=3*
*   SPARK_WORKER_MEMORY=20g*
*  SPARK_JAVA_OPTS+=-Dspark.executor.memory=5g*

   but I see the log in the master as following,

   *Spark Command: java -cp
:/usr/local/spark-0.9.0/conf:/usr/local/spark-0.9.0/assembly/target/scala-2.1
   0/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m
-Xmx512m org.apache.spark.deploy.master.Master --ip 192.168.35.2 --port
7077 --webui-port 12306*
*  *

*  log4j:WARN No appenders could be found for logger
(akka.event.slf4j.Slf4jLogger).*
*  log4j:WARN Please initialize the log4j system properly.*
*  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.*
*  14/05/07 08:30:31 INFO Master: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties*
*  14/05/07 08:30:31 INFO Master: Starting Spark master at
spark://192.168.35.2:7077 http://192.168.35.2:7077*
*  14/05/07 08:30:31 INFO MasterWebUI: Started Master web UI at
http://pug-master:12306 http://pug-master:12306*
*  14/05/07 08:30:31 INFO Master: I have been elected leader! New state:
ALIVE*
*  14/05/07 08:30:34 INFO Master: Registering worker 192.168.35.2:52972
http://192.168.35.2:52972 with 11 cores, 61.9 GB RAM*
*  14/05/07 08:30:34 INFO Master: Registering worker 192.168.35.2:43225
http://192.168.35.2:43225 with 11 cores, 61.9 GB RAM*


and the log in my worker as following:

   *Spark Command: java -cp
:/usr/local/spark-0.9.0/conf:/usr/local/spark-0.9.0/assembly/target/scala-2.1
   0/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m
-Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.35.2:7077
http://192.168.35.2:7077*
*   *

*  log4j:WARN No appenders could be found for logger
(akka.event.slf4j.Slf4jLogger).*
*  log4j:WARN Please initialize the log4j system properly.*
*  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.*
*  14/05/07 08:30:34 INFO Worker: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties*
*  14/05/07 08:30:34 INFO Worker: Starting Spark worker pug1:43225 with 11
cores, 61.9 GB RAM*
*  14/05/07 08:30:34 INFO Worker: Spark home: /usr/local/spark-0.9.0*
*  14/05/07 08:30:34 INFO WorkerWebUI: Started Worker web UI at
http://pug1:8081 http://pug1:8081*
*  14/05/07 08:30:34 INFO Worker: Connecting to master
spark://192.168.35.2:7077...*
* 14/05/07 08:30:34 INFO Worker: Successfully registered with master
spark://192.168.35.2:7077 http://192.168.35.2:7077*



   I have checked that I do not spell configuration  by mistaken, and use
the rsync sync the spark-env.sh file  from the master to the workers. but
it seem that the spark-env.sh do not take effect. I do not know what I have
missed.


spark.akka.frameSize setting problem

2014-03-28 Thread lihu
Hi,

I just run a simple example to generate some data for the ALS
algorithm. my spark version is 0.9, and in local mode, the memory of my
node is 108G


but when I set conf.set(spark.akka.frameSize, 4096), it
then occurred the following problem, and when I do not set this, it runs
well .


I see the Meaning of the ***spark.akka.frameSize is the maximum
message  size, and it in MB.*

*From the error info, I guess it is the frameSize too large that the
startup  timed out, but the conf meaning is the Maximum, not the init, and
what confuse me is the Setting 'maximum-frame-size' must be at least 32000
bytes info, because my frame-size is greater than 32000. can anyone
explain this?  or maybe the conf document should specify more clearly?*



 *ERROR OneForOneStrategy: Cannot instantiate transport
[akka.remote.transport.netty.NettyTransport]. Make sure it extends
[akka.remote.transport.Transport] and has constructor with
[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config]
parameters*
*java.lang.IllegalArgumentException: Cannot instantiate transport
[akka.remote.transport.netty.NettyTransport]. Make sure it extends
[akka.remote.transport.Transport] and has constructor with
[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config]
parameters*
* at
akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620)*
* at
akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618)*
*...*
* at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
* at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
*Caused by: java.lang.IllegalArgumentException: requirement failed: Setting
'maximum-frame-size' must be at least 32000 bytes*
* at scala.Predef$.require(Predef.scala:233)*
* at akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104)*
* ...*
* at scala.util.Success.flatMap(Try.scala:200)*
* at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
* ... 19 more*
*14/03/28 07:25:27 ERROR Remoting: Remoting error: [Startup timed out] [*
*akka.remote.RemoteTransportException: Startup timed out*
* at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
* at akka.remote.Remoting.start(Remoting.scala:191)*
* ...*
* at sbt.Logger$$anon$4.apply(Logger.scala:90)*
* at sbt.TrapExit$App.run(TrapExit.scala:244)*
* at java.lang.Thread.run(Thread.java:744)*
*Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]*
* at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)*
* ...*
* at akka.remote.Remoting.start(Remoting.scala:173)*


Re: how to use the log4j for the standalone app

2014-03-10 Thread lihu
Thanks, but I do not to log myself program info, I just do not want spark
output all the info to my console, I want the spark output the log into to
some file which I specified.



On Tue, Mar 11, 2014 at 11:49 AM, Robin Cjc cjcro...@gmail.com wrote:

 Hi lihu,

 you can extends the org.apache.spark.logging class. Then use the function
 like logInfo(). Then will log according to the config in your
 log4j.properties.

 Best Regards,
 Chen Jingci


 On Tue, Mar 11, 2014 at 11:36 AM, lihu lihu...@gmail.com wrote:

 Hi,
I use the spark0.9, and when i run the spark-shell, I can log property
 according the log4j.properties in the SPARK_HOME/conf directory.But when I
 use the standalone app, I do not know how to log it.

   I use the SparkConf to set it, such as:

   *val conf = new SparkConf()*
 *  conf.set(*log4j.configuration*, /home/hadoop/spark/conf/l*
 *og4j.properties**)*

   but it does not work.

   this question maybe simple, but I can not find anything in the web. and
 I think this maybe helpful for many people who do not familiar with spark.





-- 
*Best Wishes!*

*Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/)*
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Tel  : +86 15120081920*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*