Re: [Yarn-Client]Can not access SparkUI

2015-10-26 Thread Earthson Lu
: 
I/O exception (java.net.ConnectException) caught when processing request: 连接超时
2015-10-26 11:45:36,600 INFO org.apache.commons.httpclient.HttpMethodDirector: 
Retrying request


-- 
Earthson Lu

On October 26, 2015 at 15:30:21, Deng Ching-Mallete (och...@apache.org) wrote:

Hi Earthson,

Unfortunately, attachments aren't allowed in the list so they seemed to have 
been removed from your email. Anyway, what happens when you click the 
ApplicationMaster link?

Thanks,
Deng

On Mon, Oct 26, 2015 at 2:21 PM, Earthson  wrote:
We are using Spark 1.5.1 with `--master yarn`, Yarn RM is running in HA mode.

direct visit




click ApplicationMaster link




YARN RM log






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Client-Can-not-access-SparkUI-tp25197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





[Yarn-Client]Can not access SparkUI

2015-10-25 Thread Earthson
We are using Spark 1.5.1 with `--master yarn`, Yarn RM is running in HA mode.

direct visit




click ApplicationMaster link




YARN RM log






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Client-Can-not-access-SparkUI-tp25197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark-1.4.0]jackson-databind conflict?

2015-06-14 Thread Earthson Lu
I’ve recompiled spark-1.4.0 with fasterxml-2.5.x, it works fine now:)


-- 
Earthson Lu

On June 12, 2015 at 23:24:32, Sean Owen (so...@cloudera.com) wrote:

I see the same thing in an app that uses Jackson 2.5. Downgrading to  
2.4 made it work. I meant to go back and figure out if there's  
something that can be done to work around this in Spark or elsewhere,  
but for now, harmonize your Jackson version at 2.4.x if you can.  

On Fri, Jun 12, 2015 at 4:20 PM, Earthson  wrote:  
> I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it 
>  
> failed after I upgrade Spark to spark-1.4.0:(  
>  
> sc.parallelize(1 to 1).count  
>  
>   
> [info] com.fasterxml.jackson.databind.JsonMappingException: Could not find  
> creator property with name 'id' (in class  
> org.apache.spark.rdd.RDDOperationScope)  
> [info] at [Source: {"id":"0","name":"parallelize"}; line: 1, column: 1]  
> [info] at  
> com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
>   
> [info] at  
> com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
>   
> [info] at  
> com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
>   
>   
>  
>  
>  
> --  
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html
>   
> Sent from the Apache Spark User List mailing list archive at Nabble.com.  
>  
> -  
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org  
> For additional commands, e-mail: user-h...@spark.apache.org  
>  


[Spark-1.4.0]jackson-databind conflict?

2015-06-12 Thread Earthson
I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it
failed after I upgrade Spark to spark-1.4.0:(

sc.parallelize(1 to 1).count


[info]   com.fasterxml.jackson.databind.JsonMappingException: Could not find
creator property with name 'id' (in class
org.apache.spark.rdd.RDDOperationScope)
[info]  at [Source: {"id":"0","name":"parallelize"}; line: 1, column: 1]
[info]   at
com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
[info]   at
com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: what is the best way to implement mini batches?

2014-12-15 Thread Earthson Lu
Hi Imran, you are right. Sequentially process does not make sense to use spark.

I think Sequentially process works if batch for each iteration is large 
enough(this batch could be processed in parallel).

My point is that we shall not run mini-batches in parallel, but it still 
possible to use large batch for parallel inside each batch(It seems to be the 
way that SGD implemented in MLLib does?).


-- 
Earthson Lu

On December 16, 2014 at 04:02:22, Imran Rashid (im...@therashids.com) wrote:

I'm a little confused by some of the responses.  It seems like there are two 
different issues being discussed here:

1.  How to turn a sequential algorithm into something that works on spark.  Eg 
deal with the fact that data is split into partitions which are processed in 
parallel (though within a partition, data is processed sequentially).  I'm 
guessing folks are particularly interested in online machine learning algos, 
which often have a point update and a mini batch update.

2.  How to convert a one-point-at-a-time view of the data and convert it into a 
mini batches view of the data.

(2) is pretty straightforward, eg with iterator.grouped (batchSize), or 
manually put data into your own buffer etc.  This works for creating mini 
batches *within* one partition in the context of spark.

But problem (1) is completely separate, and there is no general solution.  It 
really depends the specifics of what you're trying to do.

Some of the suggestions on this thread seem like they are basically just 
falling back to sequential data processing ... but reay inefficient 
sequential processing.  Eg.  It doesn't make sense to do a full scan of your 
data with spark, and ignore all the records but the few that are in the next 
mini batch.

It's completely reasonable to just sequentially process all the data if that 
works for you.  But then it doesn't make sense to use spark, you're not gaining 
anything from it.

Hope this helps, apologies if I just misunderstood the other suggested 
solutions.

On Dec 14, 2014 8:35 PM, "Earthson"  wrote:
I think it could be done like:

1. using mapPartition to randomly drop some partition
2. drop some elements randomly(for selected partition)
3. calculate gradient step for selected elements

I don't think fixed step is needed, but fixed step could be done:

1. zipWithIndex
2. create ShuffleRDD based on the index(eg. using index/10 as key)
3. using mapPartition to calculate each bach

I also have a question:

Can mini batches run in parallel?
I think parallel all batches just like a full batch GD in some case.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: what is the best way to implement mini batches?

2014-12-14 Thread Earthson
I think it could be done like:

1. using mapPartition to randomly drop some partition
2. drop some elements randomly(for selected partition)
3. calculate gradient step for selected elements

I don't think fixed step is needed, but fixed step could be done:

1. zipWithIndex
2. create ShuffleRDD based on the index(eg. using index/10 as key)
3. using mapPartition to calculate each bach

I also have a question: 

Can mini batches run in parallel?
I think parallel all batches just like a full batch GD in some case. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)

2014-11-21 Thread Earthson
Finally, I've found two ways:

1. search the output with something like "Submitted application
application_1416319392519_0115"
2. use specific AppName. We could query the ApplicationID(yarn)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462p19466.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)

2014-11-21 Thread Earthson
Is there any way to get the yarn application_id inside the program?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: [SparkSQL] Convert JavaSchemaRDD to SchemaRDD

2014-10-16 Thread Earthson
I'm trying to give API interface to Java users. And I need to accept their
JavaSchemaRDDs, and convert it to SchemaRDD for Scala users.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Convert-JavaSchemaRDD-to-SchemaRDD-tp16482p16641.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SparkSQL] Convert JavaSchemaRDD to SchemaRDD

2014-10-15 Thread Earthson
I don't know why the JavaSchemaRDD.baseSchemaRDD is private[sql]. And I found
that DataTypeConversions is protected[sql].

Finally I find this solution: 



jrdd.registerTempTable("transform_tmp")
jrdd.sqlContext.sql("select * from transform_tmp")





Could Any One tell me that: Is it a good idea for me to *use catalyst as
DSL's execution engine?*

I am trying to build a DSL, And I want to confirm this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Convert-JavaSchemaRDD-to-SchemaRDD-tp16482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [PySpark][Python 2.7.8][Spark 1.0.2] count() with TypeError: an integer is required

2014-08-22 Thread Earthson
Do I have to deploy Python to every machine to make "$PYSPARK_PYTHON" work
correctly?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Python-2-7-8-Spark-1-0-2-count-with-TypeError-an-integer-is-required-tp12643p12651.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [PySpark][Python 2.7.8][Spark 1.0.2] count() with TypeError: an integer is required

2014-08-22 Thread Earthson
I'm running pyspark with Python 2.7.8 under Virtualenv

System Python Version: Python 2.6.x 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Python-2-7-8-Spark-1-0-2-count-with-TypeError-an-integer-is-required-tp12643p12645.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[PySpark][Python 2.7.8][Spark 1.0.2] count() with TypeError: an integer is required

2014-08-22 Thread Earthson
I am using PySpark with IPython notebook.


data = sc.parallelize(range(1000), 10)

#successful
data.map(lambda x: x+1).collect() 

#Error
data.count()



Something
similar:http://apache-spark-user-list.1001560.n3.nabble.com/Exception-on-simple-pyspark-script-td3415.html

But it does not figure out how to solve it. Any one help?

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 data.count()

/home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in
count(self)
735 3
736 """
--> 737 return self.mapPartitions(lambda i: [sum(1 for _ in
i)]).sum()
738 
739 def stats(self):

/home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in sum(self)
726 6.0
727 """
--> 728 return self.mapPartitions(lambda x:
[sum(x)]).reduce(operator.add)
729 
730 def count(self):

/home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in
reduce(self, f)
646 if acc is not None:
647 yield acc
--> 648 vals = self.mapPartitions(func).collect()
649 return reduce(f, vals)
650 

/home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in
collect(self)
610 """
611 with _JavaStackTrace(self.context) as st:
--> 612   bytesInJava = self._jrdd.collect().iterator()
613 return
list(self._collect_iterator_through_file(bytesInJava))
614 

/home/workspace/spark-1.0.2-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
--> 537 self.target_id, self.name)
538 
539 for temp_arg in temp_args:

/home/workspace/spark-1.0.2-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(


org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/worker.py",
line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
  File
"/home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py",
line 182, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File
"/home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py",
line 117, in dump_stream
for obj in iterator:
  File
"/home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py",
line 171, in _batched
for item in iterator:
  File "/home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.py", line
642, in func
TypeError: an integer is required

   
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
   
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:145)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
org.apache.spark.scheduler.Task.run(Task.scala:51)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActo

Re: [Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。

2014-07-29 Thread Earthson
Too many GC.

The task runs much more faster with more memory(heap space). The CPU load is
still too high, and network load is about 20+MB/s(not high enough)

So what is the correct way to solve this GC problem? Is there other ways
except using more memory?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765p10922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: [Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。

2014-07-29 Thread Earthson
It's really strange that cpu load so high and both disk/network IO load so
low. CLUSTER BY is just something similar to groupBy, why it needs so much
cpu resource?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765p10851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: [Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。

2014-07-28 Thread Earthson
"spark.MapOutputTrackerMasterActor: Asked to send map output locations for
shuffle 0 to" takes too much time, what should I do? What is the correct
configuration?

blockManager timeout if I using a small number of reduce partition.


 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765p10825.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


[Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。

2014-07-28 Thread Earthson
I'm using SparkSQL with Hive 0.13, here is the SQL for inserting a partition
with 2048 buckets.

  sqlsc.set("spark.sql.shuffle.partitions", "2048")
  hql("""|insert %s table mz_log
   |PARTITION (date='%s')
   |select * from tmp_mzlog
   |CLUSTER BY mzid
""".stripMargin.format(overwrite, log_date))


env:

yarn-client mode with 80 executor, 2 cores/per executor.

Data:

original text log is about 1.1T.

- - -

the reduce stage is too slow.


 

here is the network usage, it's not the bottle neck. 


 

and the CPU load is very high, why? 


 
here is the configuration(conf/spark-defaults.conf)


spark.ui.port   
spark.akka.frameSize128
spark.akka.timeout  600
spark.akka.threads  8
spark.files.overwrite   true
spark.executor.memory   2G
spark.default.parallelism   32
spark.shuffle.consolidateFiles  true
spark.kryoserializer.buffer.mb  128
spark.storage.blockManagerSlaveTimeoutMs20
spark.serializerorg.apache.spark.serializer.KryoSerializer


2 failed with MapTracker Error.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why spark-submit command hangs?

2014-07-22 Thread Earthson
That's what my problem is:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10394.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why spark-submit command hangs?

2014-07-22 Thread Earthson
I've just have the same problem.

I'm using


$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client $JOBJAR
--class $JOBCLASS


It's really strange, because the log shows that 


14/07/22 16:16:58 INFO ui.SparkUI: Started SparkUI at
http://k1227.mzhen.cn:4040
14/07/22 16:16:58 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/07/22 16:16:58 INFO spark.SparkContext: Added JAR
/home/workspace/ci-demo/target/scala-2.10/SemiData-CIDemo-Library-assembly-0.1.jar
at http://192.168.7.37:53050/jars/SemiData-CIDemo-Library-assembly-0.1.jar
with timestamp 1406017018666
14/07/22 16:16:58 INFO cluster.YarnClusterScheduler: Created
YarnClusterScheduler
14/07/22 16:16:58 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook
for context org.apache.spark.SparkContext@41ecfc8c


Why cluster.YarnClusterScheduler start? where's the Client?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10392.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How could I set the number of executor?

2014-06-20 Thread Earthson
--num-executors seems to be only available with YARN-only.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-set-the-number-of-executor-tp7990p7992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How could I set the number of executor?

2014-06-20 Thread Earthson
"spark-submit" has an arguments "--num-executors" to set the number of
executor, but how could I set it from anywhere else?

We're using Shark, and want to change the number of executor. The number of
executor seems to be same as workers by default?

Shall we configure the executor number manually(Is there an automatically
way?)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-set-the-number-of-executor-tp7990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How to add jar with SparkSQL HiveContext?

2014-06-16 Thread Earthson
I have a problem with add jar command
hql("add jar /.../xxx.jar")
Error:
Exception in thread "main" java.lang.AssertionError: assertion failed: No
plan for AddJar ...
How could I do this job with HiveContext, I can't find any api to do it.
Does SparkSQL with Hive support UDF/UDAF?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jar-with-SparkSQL-HiveContext-tp7713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: problem about broadcast variable in iteration

2014-05-15 Thread Earthson
RDD is not cached? 

Because recomputing may be required, every broadcast object is included in
the dependences of RDDs, this may also have memory issue(when n and kv is
too large in your case).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p5495.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


[Suggestion]Strange behavior for broadcast cleaning with spark 0.9

2014-05-15 Thread Earthson
I'm using spark-0.9 with YARN.

Q: Why spark.cleaner.ttl setting could remove broadcast that still in use? I
think cleaner should not remove broadcasts still in the dependences of some
RDDs. It makes the value of spark.cleaner.ttl need to be set more carefully.

POINT: cleaner should not crash the task.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Suggestion-Strange-behavior-for-broadcast-cleaning-with-spark-0-9-tp5512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Incredible slow iterative computation

2014-05-05 Thread Earthson
checkpoint seems to be just add a CheckPoint mark? You need an action after
marked it. I have tried it with success:)

newRdd = oldRdd.map(myFun).persist(myStorageLevel)
newRdd.checkpoint // < {}) // Force evaluation
newRdd.isCheckpointed // true here
oldRdd.unpersist(true) 




If you have new broadcast object for each step of iteration, broadcast will
eat up all of the memory. You may need to set "spark.cleaner.ttl" to a small
enough value.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incredible-slow-iterative-computation-tp4204p5407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
Yes, I've tried.

The problem is new broadcast object generated by every step until eat up all
of the memory. 

I solved it by using RDD.checkpoint to remove dependences to old broadcast
object, and use cleanner.ttl to clean up these broadcast object
automatically. 

If there's more elegant way to solve this problem, please tell me:) 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Cache-issue-for-iteration-with-broadcast-tp5350p5385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
RDD.checkpoint works fine. But spark.cleaner.ttl is really ugly for broadcast
cleaning. May be it could be removed automatically when no dependences.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5369.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
Using checkpoint. It removes dependences:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
.set("spark.cleaner.ttl", "120") drops broadcast_0 which makes a Exception
below. It is strange, because broadcast_0 is no need, and I have broadcast_3
instead, and recent RDD is persisted, there is no need for recomputing...
what is the problem? need help.


~~~
14/05/05 17:03:12 INFO storage.MemoryStore: ensureFreeSpace(52474562) called
with curMem=145126640, maxMem=1145359564
14/05/05 17:03:12 INFO storage.MemoryStore: Block broadcast_3 stored as
values to memory (estimated size 50.0 MB, free 903.9 MB)
14/05/05 17:03:12 INFO scheduler.DAGScheduler: shuffleToMapStage 0 --> 0
14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToStage 0 --> 0
14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToJobIds 0 --> 0
~

Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154)
Caused by: org.apache.spark.SparkException: Job aborted: Task 9.0:48 failed
4 times (most recent failure: Exception failure:
java.io.FileNotFoundException: http://192.168.7.41:3503/broadcast_0)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
How could I do iteration? because the persist is lazy and recomputing may
required, all the path of iteration will be save, memory overflow can not be
escaped?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5359.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-04 Thread Earthson
I tried using serialization instead of broadcast, and my program exit with
Error(beyond physical memory limits).

The large object can not be released by GC? because it is needed for
recomputing? So what is the recomended way to solve this problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5354.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-04 Thread Earthson
Code Here
<https://github.com/Earthson/sparklda/blob/dev/src/main/scala/net/earthson/nlp/lda/lda.scala#L121>
  

Finally, iteration still runs into recomputing...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5351.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Cache issue for iteration with broadcast

2014-05-04 Thread Earthson
A new broadcast object will generated for every iteration step, it may eat up
the memory and make persist fail. 

The broadcast object should not be removed because RDD may be recomputed.
And I am trying to prevent recomputing RDD, it need old broadcast release
some memory.

I've tried to set "spark.cleaner.ttl", but my task runs into Error(broadcast
object not found), I think task is recomputed. I don't think this is a good
idea, it makes my code depends on my environment much more.

So I changed the persistLevel of my RDD to MEMORY_AND_DISK, but it runs into
ERROR(broadcast object not found) too.  And I remove the setting of
spark.cleaner.ttl, finally.

I think support for cache should be more friendly, and broadcast object
should be cached, too. flush the old object to disk is much more essential
than the new one.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: cache not work as expected for iteration?

2014-05-04 Thread Earthson
thx for the help, unpersist is excatly what I want:) 

I see that spark will remove some cache automatically when memory is full,
it is much more helpful if the rule satisfy something like LRU



It seems that persist and cache is some kind of lazy? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cache-not-work-as-expected-for-iteration-tp5292p5308.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


cache not work as expected for iteration?

2014-05-03 Thread Earthson
I'm using spark for LDA impementation. I need cache RDD for next step of
Gibbs Sampling, and cached the result and the cache previous could be
uncache. Something like LRU cache should delete the previous cache because
it is never used then, but the cache runs into confusion:

Here is the code:)
<https://github.com/Earthson/sparklda/blob/master/src/main/scala/net/earthson/nlp/lda/lda.scala#L99>
  

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n5292/sparklda_cache1.png>
 

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n5292/sparklda_cache2.png>
 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cache-not-work-as-expected-for-iteration-tp5292.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-29 Thread Earthson
I think the real problem is "spark.akka.frameSize". It is to small for
passing the data. every executor failed, and there is no executor, then the
task hangs up.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p5075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why Spark require this object to be serializerable?

2014-04-29 Thread Earthson
Finally, I'm using file to save RDDs, and then reload it. It works fine,
because Gibbs Sampling for LDA is really slow. It's about 10min to sampling
10k wiki document for 10 round(1 round/min).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5036.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why Spark require this object to be serializerable?

2014-04-28 Thread Earthson
The code is
here:https://github.com/Earthson/sparklda/blob/master/src/main/scala/net/earthson/nlp/lda/lda.scala

I've change it to from Broadcast to Serializable. Now it works:) But There
are too many rdd cache, It is the problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why Spark require this object to be serializerable?

2014-04-28 Thread Earthson
I've moved SparkContext and RDD as parameter of train. And now it tells me
that SparkContext need to serialize!

I think the the problem is RDD is trying to make itself lazy. and some
BroadCast Object need to be generate dynamicly, so the closure have
SparkContext inside, so the task complete failed. Is this true?

I have no idea about how to solve such a problem. I don't know how to make a
RDD non-lazy. rdd.cache seems to be lazy too. 

So my only choice seems to be save every step into hdfs and load it
everytime?


 72 object LDAInfo {
 73 type LDAData = rdd.RDD[(Long,String,Int)]
 74
 75 def topicInfo(mwz:LDAInfo.LDAData) = {
 76 val nzw = mwz.map(x=>(x._3, x._2)).countByValue.toSeq
 77 val nz = nzw.map(x=>(x._1._1,
x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq
 78 new TopicInfo(nzw, nz)
 79 }
 80 }
 81
 82 class ADLDAModel(val ntopics:Int, mwz:LDAInfo.LDAData) {
 83 //TODO: flexible save location and input location
 84 //TODO: Add Perplexity
 85
 86 val nterms = mwz.map(_._2).distinct.count.toInt
 87 val modelinfo = new ModelInfo(ntopics, nterms)
 88
 89 def train(sc:SparkContext, mwzo:LDAInfo.LDAData, round:Int,
innerRound:Int = 20, saveStep:Int = 5) {
 90 var step = 0
 91 var mwz = mwzo
 92 val minfo = sc broadcast this.modelinfo
 93 println("Training Start!") //DEBUG
 94 for(i <- 0 until round) {
 95 println(s"Round ${i}")
 96 val runtinfo = sc broadcast LDAInfo.topicInfo(mwz)
 97 println("started?") //DEBUG
 98 mwz = mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value,
runtinfo.value, it.toSeq, innerRound).toIterator).cache
 99 println("failed?") //DEBUG
100 step = (step + 1) % saveStep
101 if (step == 0 && i+1 != round)
mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}")
102 }
103 mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final")
104 }
105 }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why Spark require this object to be serializerable?

2014-04-28 Thread Earthson
The RDD hold "this" in its closure? How to fix such a problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5015.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why Spark require this object to be serializerable?

2014-04-28 Thread Earthson
Or what is the action that make the rdd run. I don't what to save it as file,
and I've tried cache(), it seems to be some kind of lazy too.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5011.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Why Spark require this object to be serializerable?

2014-04-28 Thread Earthson
The problem is this object can't be Serializerable, it holds a RDD field and
SparkContext. But Spark shows an error that it need Serialization.

The order of my debug output is really strange.

~
Training Start!
Round 0
Hehe?
Hehe?
started?
failed?
Round 1
Hehe?
~

here is my code

 69 import org.apache.spark.rdd
 70 import org.apache.spark.SparkContext
 71
 72 class ADLDAModel(val sc:SparkContext, var
mwz:rdd.RDD[(Long,String,Int)], val ntopics:Int) {
 73 //TODO: flexible save location and input location
 74 //TODO: Add Perplexity
 75
 76 val nterms = mwz.map(_._2).distinct.count.toInt
 77 val modelinfo = new ModelInfo(ntopics, nterms)
 78
 79 def nzw = {println("Hehe?");mwz.map(x=>(x._3,
x._2)).countByValue.toSeq}
 80 def nz = nzw.map(x=>(x._1._1,
x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq
 81 def tinfo = new TopicInfo(this.nzw, this.nz)
 82
 83 def train(round:Int, innerRound:Int = 20, saveStep:Int = 5) {
 84 var step = 0
 85 val minfo = this.sc broadcast this.modelinfo
 86 println("Training Start!") //DEBUG
 87 for(i <- 0 until round) {
 88 println(s"Round ${i}")
 89 val tmptinfo = this.tinfo
 90 val runtinfo = this.sc broadcast tmptinfo
 91 println("started?") //DEBUG
 92 this.mwz =
this.mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value, runtinfo.value,
it.toSeq, innerRound).toIterator)
 93 println("failed?") //DEBUG
 94 step = (step + 1) % saveStep
 95 if (step == 0 && i+1 != round)
this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}")
 96 }
 97
this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final")
 98 }
 99 }


Error Code Below:

14/04/29 09:48:42 INFO scheduler.DAGScheduler: Got job 3 (countByValue at
lda.scala:78) with 22 output partitions (allowLocal=false)
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Final stage: Stage 4
(countByValue at lda.scala:78)
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Submitting Stage 4
(MapPartitionsRDD[15] at countByValue at lda.scala:78), which has no missing
parents
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Failed to run countByValue at
lda.scala:78
14/04/29 09:48:42 INFO yarn.ApplicationMaster: finishApplicationMaster with
FAILED
14/04/29 09:48:42 INFO impl.AMRMClientImpl: Waiting for application to be
successfully unregistered.
Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154)
Caused by: org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException:
net.earthson.nlp.lda.ADLDAModel
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorke

Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
It's my fault! I upload a wrong jar when I changed the number of partitions.
and Now it just works fine:)

The size of word_mapping is 2444185.

So it will take very long time for large object serialization? I don't think
two million is very large, because the cost at local for such size is
typically less than one second. 

Thanks for the help:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4914.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
That's not work. I don't think it is just slow, It never ends(with 30+ hours,
and I killed it). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
parallelize is still so slow. 



package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Map[String,Int]])
kryo.register(classOf[Map[String,Long]])
kryo.register(classOf[Seq[(String,Long)]])
kryo.register(classOf[Seq[(String,Int)]])
}
}

object WFilter2 {
def initspark(name:String) = {
val conf = new SparkConf()
.setMaster("yarn-standalone")
.setAppName(name)
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(SparkContext.jarOfClass(this.getClass))
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
//.set("spark.closure.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.mb", "256")
.set("spark.kryo.registrator",
"com.semi.nlp.MyRegistrator")
.set("spark.cores.max", "30")
new SparkContext(conf)
}

def main(args: Array[String]) {
val spark = initspark("word filter mapping")
val stopset = spark broadcast
Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
val tf_map = spark broadcast
file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey
val df_map = spark broadcast
file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
< 4 || (stopset.value contains w)) false else true
val mapped =
file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
   
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
spark.stop()
}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
reduceByKey(_+_).countByKey instead of countByKey seems to be fast.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4870.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
This error come just because I killed my App:(

Is there something wrong? the reduceByKey operation is extremely slow(than
default Serializer).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
I've tried to set larger buffer, but reduceByKey seems to be failed. need
help:)

14/04/26 12:31:12 INFO cluster.CoarseGrainedSchedulerBackend: Shutting down
all executors
14/04/26 12:31:12 INFO cluster.CoarseGrainedSchedulerBackend: Asking each
executor to shut down
14/04/26 12:31:12 INFO scheduler.DAGScheduler: Failed to run countByKey at
filter_2.scala:35
14/04/26 12:31:12 INFO yarn.ApplicationMaster: finishApplicationMaster with
FAILED
Exception in thread "Thread-3"
org.apache.hadoop.yarn.exceptions.YarnException: Application doesn't exist
in cache appattempt_1398305021882_0069_01
at 
org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)
at
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.finishApplicationMaster(ApplicationMasterService.java:294)
at
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.finishApplicationMaster(ApplicationMasterProtocolPBServiceImpl.java:75)
at
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:97)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at 
org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:101)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:94)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at $Proxy12.finishApplicationMaster(Unknown Source)
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.unregisterApplicationMaster(AMRMClientImpl.java:311)
at
org.apache.spark.deploy.yarn.ApplicationMaster.finishApplicationMaster(ApplicationMaster.scala:320)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:165)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.YarnException):
Application doesn't exist in cache appattempt_1398305021882_0069_01
at 
org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)
at
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.finishApplicationMaster(ApplicationMasterService.java:294)
at
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.finishApplicationMaster(ApplicationMasterProtocolPBServiceImpl.java:75)
at
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:97)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at $Proxy11.finishApplicationMaster(Unknown Source)
at
o

Re: parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Earthson
Kryo With Exception below:

com.esotericsoftware.kryo.KryoException
(com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 1)
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)
com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)

~~~

package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Map[String,Int]])
}
}

object WFilter2 {
def initspark(name:String) = {
val conf = new SparkConf()
.setMaster("yarn-standalone")
.setAppName(name)
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(SparkContext.jarOfClass(this.getClass))
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"com.semi.nlp.MyRegistrator")
new SparkContext(conf)
}

def main(args: Array[String]) {
val spark = initspark("word filter mapping")
val stopset =
Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
val tf_map = spark broadcast
file.flatMap(_.split("\t")).map((_,1)).countByKey
val df_map = spark broadcast
file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
< 4 || (stopset contains w)) false else true
val mapped =
file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
   
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
spark.stop()
}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Earthson Lu
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")

this line is too slow. There are about 2 million elements in word_mapping.

*Is there a good style for writing a large collection to hdfs?*

import org.apache.spark._
> import SparkContext._
> import scala.io.Source
> object WFilter {
> def main(args: Array[String]) {
> val spark = new SparkContext("yarn-standalone","word
> filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
> val stopset =
> Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
> val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
> val tf_map = spark broadcast
> file.flatMap(_.split("\t")).map((_,1)).countByKey
> val df_map = spark broadcast
> file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
> val word_mapping = spark broadcast
> Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
> def w_filter(w:String) = if (tf_map.value(w) < 8 ||
> df_map.value(w) < 4 || (stopset contains w)) false else true
> val mapped =
> file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
>
> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
> spark.stop()
> }
> }


many thx:)

-- 

~
Perfection is achieved
not when there is nothing more to add
 but when there is nothing left to take away